mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-07 07:18:03 +00:00
validator_discovery: simplification (#3009)
* validator_discovery: simplification * compilation fixes * compilation fixes II * compilation fixes III * compilation fixes IV
This commit is contained in:
@@ -16,12 +16,12 @@
|
||||
|
||||
//! PoV requester takes care of requesting PoVs from validators of a backing group.
|
||||
|
||||
use futures::{FutureExt, channel::{mpsc, oneshot}, future::BoxFuture};
|
||||
use futures::{FutureExt, channel::oneshot, future::BoxFuture};
|
||||
use lru::LruCache;
|
||||
|
||||
use polkadot_subsystem::jaeger;
|
||||
use polkadot_node_network_protocol::{
|
||||
PeerId, peer_set::PeerSet,
|
||||
peer_set::PeerSet,
|
||||
request_response::{OutgoingRequest, Recipient, request::{RequestError, Requests},
|
||||
v1::{PoVFetchingRequest, PoVFetchingResponse}}
|
||||
};
|
||||
@@ -46,7 +46,7 @@ pub struct PoVRequester {
|
||||
///
|
||||
/// So we keep an LRU for managing connection requests of size 2.
|
||||
/// Cache will contain `None` if we are not a validator in that session.
|
||||
connected_validators: LruCache<SessionIndex, Option<mpsc::Receiver<(AuthorityDiscoveryId, PeerId)>>>,
|
||||
connected_validators: LruCache<SessionIndex, Option<oneshot::Sender<()>>>,
|
||||
}
|
||||
|
||||
impl PoVRequester {
|
||||
@@ -78,8 +78,8 @@ impl PoVRequester {
|
||||
if self.connected_validators.contains(&session_index) {
|
||||
continue
|
||||
}
|
||||
let rx = connect_to_relevant_validators(ctx, runtime, parent, session_index).await?;
|
||||
self.connected_validators.put(session_index, rx);
|
||||
let tx = connect_to_relevant_validators(ctx, runtime, parent, session_index).await?;
|
||||
self.connected_validators.put(session_index, tx);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -190,17 +190,16 @@ async fn connect_to_relevant_validators<Context>(
|
||||
parent: Hash,
|
||||
session: SessionIndex
|
||||
)
|
||||
-> super::Result<Option<mpsc::Receiver<(AuthorityDiscoveryId, PeerId)>>>
|
||||
-> super::Result<Option<oneshot::Sender<()>>>
|
||||
where
|
||||
Context: SubsystemContext,
|
||||
{
|
||||
if let Some(validator_ids) = determine_relevant_validators(ctx, runtime, parent, session).await? {
|
||||
// We don't actually care about `PeerId`s, just keeping receiver so we stay connected:
|
||||
let (tx, rx) = mpsc::channel(0);
|
||||
let (tx, keep_alive) = oneshot::channel();
|
||||
ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::ConnectToValidators {
|
||||
validator_ids, peer_set: PeerSet::Validation, connected: tx
|
||||
validator_ids, peer_set: PeerSet::Validation, keep_alive
|
||||
})).await;
|
||||
Ok(Some(rx))
|
||||
Ok(Some(tx))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
@@ -23,7 +23,6 @@
|
||||
use parity_scale_codec::{Encode, Decode};
|
||||
use parking_lot::Mutex;
|
||||
use futures::prelude::*;
|
||||
use futures::channel::mpsc;
|
||||
use sc_network::Event as NetworkEvent;
|
||||
use sp_consensus::SyncOracle;
|
||||
|
||||
@@ -36,7 +35,7 @@ use polkadot_subsystem::messages::{
|
||||
NetworkBridgeMessage, AllMessages,
|
||||
CollatorProtocolMessage, NetworkBridgeEvent,
|
||||
};
|
||||
use polkadot_primitives::v1::{Hash, BlockNumber, AuthorityDiscoveryId};
|
||||
use polkadot_primitives::v1::{Hash, BlockNumber};
|
||||
use polkadot_node_network_protocol::{
|
||||
PeerId, peer_set::PeerSet, View, v1 as protocol_v1, OurView, UnifiedReputationChange as Rep,
|
||||
ObservedRole,
|
||||
@@ -314,12 +313,6 @@ impl From<SubsystemError> for UnexpectedAbort {
|
||||
}
|
||||
}
|
||||
|
||||
// notifications to be passed through to the validator discovery worker.
|
||||
enum ValidatorDiscoveryNotification {
|
||||
PeerConnected(PeerId, PeerSet, Option<AuthorityDiscoveryId>),
|
||||
PeerDisconnected(PeerId, PeerSet),
|
||||
}
|
||||
|
||||
#[derive(Default, Clone)]
|
||||
struct Shared(Arc<Mutex<SharedInner>>);
|
||||
|
||||
@@ -339,7 +332,6 @@ async fn handle_subsystem_messages<Context, N, AD>(
|
||||
mut ctx: Context,
|
||||
mut network_service: N,
|
||||
mut authority_discovery_service: AD,
|
||||
validator_discovery_notifications: mpsc::Receiver<ValidatorDiscoveryNotification>,
|
||||
shared: Shared,
|
||||
sync_oracle: Box<dyn SyncOracle + Send>,
|
||||
metrics: Metrics,
|
||||
@@ -356,8 +348,6 @@ where
|
||||
|
||||
let mut mode = Mode::Syncing(sync_oracle);
|
||||
|
||||
let mut validator_discovery_notifications = validator_discovery_notifications.fuse();
|
||||
|
||||
loop {
|
||||
futures::select! {
|
||||
msg = ctx.recv().fuse() => match msg {
|
||||
@@ -514,7 +504,7 @@ where
|
||||
NetworkBridgeMessage::ConnectToValidators {
|
||||
validator_ids,
|
||||
peer_set,
|
||||
connected,
|
||||
keep_alive,
|
||||
} => {
|
||||
tracing::trace!(
|
||||
target: LOG_TARGET,
|
||||
@@ -527,7 +517,7 @@ where
|
||||
let (ns, ads) = validator_discovery.on_request(
|
||||
validator_ids,
|
||||
peer_set,
|
||||
connected,
|
||||
keep_alive,
|
||||
network_service,
|
||||
authority_discovery_service,
|
||||
).await;
|
||||
@@ -538,19 +528,6 @@ where
|
||||
}
|
||||
Err(e) => return Err(e.into()),
|
||||
},
|
||||
notification = validator_discovery_notifications.next().fuse() => match notification {
|
||||
None => return Ok(()),
|
||||
Some(ValidatorDiscoveryNotification::PeerConnected(peer, peer_set, maybe_auth)) => {
|
||||
validator_discovery.on_peer_connected(
|
||||
peer.clone(),
|
||||
peer_set,
|
||||
maybe_auth,
|
||||
).await;
|
||||
}
|
||||
Some(ValidatorDiscoveryNotification::PeerDisconnected(peer, peer_set)) => {
|
||||
validator_discovery.on_peer_disconnected(&peer, peer_set);
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -560,7 +537,6 @@ async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
|
||||
mut network_service: impl Network,
|
||||
mut authority_discovery_service: AD,
|
||||
mut request_multiplexer: RequestMultiplexer,
|
||||
mut validator_discovery_notifications: mpsc::Sender<ValidatorDiscoveryNotification>,
|
||||
metrics: Metrics,
|
||||
shared: Shared,
|
||||
) -> Result<(), UnexpectedAbort> {
|
||||
@@ -612,13 +588,6 @@ async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
|
||||
authority_discovery_service
|
||||
.get_authority_id_by_peer_id(peer).await;
|
||||
|
||||
// Failure here means that the other side of the network bridge
|
||||
// has concluded and this future will be dropped in due course.
|
||||
let _ = validator_discovery_notifications.send(
|
||||
ValidatorDiscoveryNotification::PeerConnected(peer, peer_set, maybe_authority.clone())
|
||||
).await;
|
||||
|
||||
|
||||
match peer_set {
|
||||
PeerSet::Validation => {
|
||||
dispatch_validation_events_to_all(
|
||||
@@ -694,12 +663,6 @@ async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
|
||||
w
|
||||
};
|
||||
|
||||
// Failure here means that the other side of the network bridge
|
||||
// has concluded and this future will be dropped in due course.
|
||||
let _ = validator_discovery_notifications.send(
|
||||
ValidatorDiscoveryNotification::PeerDisconnected(peer.clone(), peer_set)
|
||||
).await;
|
||||
|
||||
if was_connected {
|
||||
match peer_set {
|
||||
PeerSet::Validation => dispatch_validation_event_to_all(
|
||||
@@ -858,14 +821,11 @@ where
|
||||
.get_statement_fetching()
|
||||
.expect("Gets initialized, must be `Some` on startup. qed.");
|
||||
|
||||
let (validation_worker_tx, validation_worker_rx) = mpsc::channel(1024);
|
||||
|
||||
let (remote, network_event_handler) = handle_network_messages(
|
||||
ctx.sender().clone(),
|
||||
network_service.clone(),
|
||||
authority_discovery_service.clone(),
|
||||
request_multiplexer,
|
||||
validation_worker_tx,
|
||||
metrics.clone(),
|
||||
shared.clone(),
|
||||
).remote_handle();
|
||||
@@ -880,7 +840,6 @@ where
|
||||
ctx,
|
||||
network_service,
|
||||
authority_discovery_service,
|
||||
validation_worker_rx,
|
||||
shared,
|
||||
sync_oracle,
|
||||
metrics,
|
||||
|
||||
@@ -22,9 +22,9 @@ use core::marker::PhantomData;
|
||||
use std::collections::{HashSet, HashMap, hash_map};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use futures::channel::mpsc;
|
||||
use futures::channel::oneshot;
|
||||
|
||||
use sc_network::{config::parse_addr, multiaddr::Multiaddr};
|
||||
use sc_network::multiaddr::Multiaddr;
|
||||
use sc_authority_discovery::Service as AuthorityDiscoveryService;
|
||||
use polkadot_node_network_protocol::PeerId;
|
||||
use polkadot_primitives::v1::AuthorityDiscoveryId;
|
||||
@@ -55,39 +55,24 @@ impl AuthorityDiscovery for AuthorityDiscoveryService {
|
||||
/// This struct tracks the state for one `ConnectToValidators` request.
|
||||
struct NonRevokedConnectionRequestState {
|
||||
requested: Vec<AuthorityDiscoveryId>,
|
||||
pending: HashSet<AuthorityDiscoveryId>,
|
||||
sender: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>,
|
||||
keep_alive: oneshot::Receiver<()>,
|
||||
}
|
||||
|
||||
impl NonRevokedConnectionRequestState {
|
||||
/// Create a new instance of `ConnectToValidatorsState`.
|
||||
pub fn new(
|
||||
requested: Vec<AuthorityDiscoveryId>,
|
||||
pending: HashSet<AuthorityDiscoveryId>,
|
||||
sender: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>,
|
||||
keep_alive: oneshot::Receiver<()>,
|
||||
) -> Self {
|
||||
Self {
|
||||
requested,
|
||||
pending,
|
||||
sender,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn on_authority_connected(
|
||||
&mut self,
|
||||
authority: &AuthorityDiscoveryId,
|
||||
peer_id: &PeerId,
|
||||
) {
|
||||
if self.pending.remove(authority) {
|
||||
// an error may happen if the request was revoked or
|
||||
// the channel's buffer is full, ignoring it is fine
|
||||
let _ = self.sender.try_send((authority.clone(), peer_id.clone()));
|
||||
keep_alive,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns `true` if the request is revoked.
|
||||
pub fn is_revoked(&mut self) -> bool {
|
||||
self.sender.is_closed()
|
||||
self.keep_alive.try_recv().is_err()
|
||||
}
|
||||
|
||||
pub fn requested(&self) -> &[AuthorityDiscoveryId] {
|
||||
@@ -120,8 +105,6 @@ pub(super) struct Service<N, AD> {
|
||||
|
||||
#[derive(Default)]
|
||||
struct StatePerPeerSet {
|
||||
// Peers that are connected to us and authority ids associated to them.
|
||||
connected_peers: HashMap<PeerId, HashSet<AuthorityDiscoveryId>>,
|
||||
// The `u64` counts the number of pending non-revoked requests for this validator
|
||||
// note: the validators in this map are not necessarily present
|
||||
// in the `connected_validators` map.
|
||||
@@ -138,97 +121,27 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Find connected validators using the given `validator_ids`.
|
||||
///
|
||||
/// Returns a [`HashMap`] that contains the found [`AuthorityDiscoveryId`]'s and their associated [`PeerId`]'s.
|
||||
#[tracing::instrument(level = "trace", skip(self, authority_discovery_service), fields(subsystem = LOG_TARGET))]
|
||||
async fn find_connected_validators(
|
||||
&mut self,
|
||||
validator_ids: &[AuthorityDiscoveryId],
|
||||
peer_set: PeerSet,
|
||||
authority_discovery_service: &mut AD,
|
||||
) -> HashMap<AuthorityDiscoveryId, PeerId> {
|
||||
let mut result = HashMap::new();
|
||||
let state = &mut self.state[peer_set];
|
||||
|
||||
for id in validator_ids {
|
||||
// First check if we already cached the validator
|
||||
if let Some(pid) = state.connected_peers
|
||||
.iter()
|
||||
.find_map(|(pid, ids)| {
|
||||
if ids.contains(&id) {
|
||||
Some(pid)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
{
|
||||
result.insert(id.clone(), pid.clone());
|
||||
continue;
|
||||
}
|
||||
|
||||
// If not ask the authority discovery
|
||||
if let Some(addresses) = authority_discovery_service.get_addresses_by_authority_id(id.clone()).await {
|
||||
for (peer_id, _) in addresses.into_iter().filter_map(|a| parse_addr(a).ok()) {
|
||||
if let Some(ids) = state.connected_peers.get_mut(&peer_id) {
|
||||
ids.insert(id.clone());
|
||||
result.insert(id.clone(), peer_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
/// On a new connection request, a priority group update will be issued.
|
||||
/// On a new connection request, a peer set update will be issued.
|
||||
/// It will ask the network to connect to the validators and not disconnect
|
||||
/// from them at least until all the pending requests containing them are revoked.
|
||||
///
|
||||
/// This method will also clean up all previously revoked requests.
|
||||
/// it takes `network_service` and `authority_discovery_service` by value
|
||||
/// and returns them as a workaround for the Future: Send requirement imposed by async fn impl.
|
||||
#[tracing::instrument(level = "trace", skip(self, connected, network_service, authority_discovery_service), fields(subsystem = LOG_TARGET))]
|
||||
pub async fn on_request(
|
||||
&mut self,
|
||||
validator_ids: Vec<AuthorityDiscoveryId>,
|
||||
peer_set: PeerSet,
|
||||
mut connected: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>,
|
||||
keep_alive: oneshot::Receiver<()>,
|
||||
mut network_service: N,
|
||||
mut authority_discovery_service: AD,
|
||||
) -> (N, AD) {
|
||||
const MAX_ADDR_PER_PEER: usize = 3;
|
||||
|
||||
let already_connected = self.find_connected_validators(
|
||||
&validator_ids,
|
||||
peer_set,
|
||||
&mut authority_discovery_service,
|
||||
).await;
|
||||
|
||||
let state = &mut self.state[peer_set];
|
||||
// Increment the counter of how many times the validators were requested.
|
||||
validator_ids.iter().for_each(|id| *state.requested_validators.entry(id.clone()).or_default() += 1);
|
||||
|
||||
// try to send already connected peers
|
||||
for (id, peer) in already_connected.iter() {
|
||||
match connected.try_send((id.clone(), peer.clone())) {
|
||||
Err(e) if e.is_disconnected() => {
|
||||
// the request is already revoked
|
||||
for peer_id in validator_ids {
|
||||
let _ = on_revoke(&mut state.requested_validators, peer_id);
|
||||
}
|
||||
return (network_service, authority_discovery_service);
|
||||
}
|
||||
Err(_) => {
|
||||
// the channel's buffer is full
|
||||
// ignore the error, the receiver will miss out some peers
|
||||
// but that's fine
|
||||
break;
|
||||
}
|
||||
Ok(()) => continue,
|
||||
}
|
||||
}
|
||||
|
||||
// collect multiaddress of validators
|
||||
let mut multiaddr_to_add = HashSet::new();
|
||||
for authority in validator_ids.iter() {
|
||||
@@ -292,45 +205,13 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
|
||||
multiaddr_to_remove.clone()
|
||||
).await;
|
||||
|
||||
let pending = validator_ids.iter()
|
||||
.cloned()
|
||||
.filter(|id| !already_connected.contains_key(id))
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
state.non_revoked_discovery_requests.push(NonRevokedConnectionRequestState::new(
|
||||
validator_ids,
|
||||
pending,
|
||||
connected,
|
||||
keep_alive,
|
||||
));
|
||||
|
||||
(network_service, authority_discovery_service)
|
||||
}
|
||||
|
||||
/// Should be called when a peer connected.
|
||||
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
|
||||
pub async fn on_peer_connected(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
peer_set: PeerSet,
|
||||
maybe_authority: Option<AuthorityDiscoveryId>,
|
||||
) {
|
||||
let state = &mut self.state[peer_set];
|
||||
// check if it's an authority we've been waiting for
|
||||
if let Some(authority) = maybe_authority {
|
||||
for request in state.non_revoked_discovery_requests.iter_mut() {
|
||||
let _ = request.on_authority_connected(&authority, &peer_id);
|
||||
}
|
||||
|
||||
state.connected_peers.entry(peer_id).or_default().insert(authority);
|
||||
} else {
|
||||
state.connected_peers.insert(peer_id, Default::default());
|
||||
}
|
||||
}
|
||||
|
||||
/// Should be called when a peer disconnected.
|
||||
pub fn on_peer_disconnected(&mut self, peer_id: &PeerId, peer_set: PeerSet) {
|
||||
self.state[peer_set].connected_peers.remove(peer_id);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -339,8 +220,7 @@ mod tests {
|
||||
use crate::network::{Network, NetworkAction};
|
||||
|
||||
use std::{borrow::Cow, pin::Pin};
|
||||
use futures::{sink::Sink, stream::{BoxStream, StreamExt as _}};
|
||||
use sc_network::multiaddr::Protocol;
|
||||
use futures::{sink::Sink, stream::BoxStream};
|
||||
use sc_network::{Event as NetworkEvent, IfDisconnected};
|
||||
use sp_keyring::Sr25519Keyring;
|
||||
use polkadot_node_network_protocol::request_response::request::Requests;
|
||||
@@ -440,132 +320,53 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn request_is_revoked_when_the_receiver_is_dropped() {
|
||||
let (sender, receiver) = mpsc::channel(0);
|
||||
let (keep_alive_handle, keep_alive) = oneshot::channel();
|
||||
|
||||
let mut request = NonRevokedConnectionRequestState::new(
|
||||
Vec::new(),
|
||||
HashSet::new(),
|
||||
sender,
|
||||
keep_alive,
|
||||
);
|
||||
|
||||
assert!(!request.is_revoked());
|
||||
|
||||
drop(receiver);
|
||||
drop(keep_alive_handle);
|
||||
|
||||
assert!(request.is_revoked());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn requests_are_fulfilled_immediately_for_already_connected_peers() {
|
||||
let mut service = new_service();
|
||||
|
||||
let (ns, mut ads) = new_network();
|
||||
|
||||
let peer_ids: Vec<_> = ads.by_peer_id.keys().cloned().collect();
|
||||
let authority_ids: Vec<_> = ads.by_peer_id.values().cloned().collect();
|
||||
|
||||
futures::executor::block_on(async move {
|
||||
let req1 = vec![authority_ids[0].clone(), authority_ids[1].clone()];
|
||||
let (sender, mut receiver) = mpsc::channel(2);
|
||||
|
||||
let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[0]).await;
|
||||
service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, maybe_authority).await;
|
||||
|
||||
let _ = service.on_request(
|
||||
req1,
|
||||
PeerSet::Validation,
|
||||
sender,
|
||||
ns,
|
||||
ads,
|
||||
).await;
|
||||
|
||||
|
||||
// the results should be immediately available
|
||||
let reply1 = receiver.next().await.unwrap();
|
||||
assert_eq!(reply1.0, authority_ids[0]);
|
||||
assert_eq!(reply1.1, peer_ids[0]);
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn requests_are_fulfilled_on_peer_connection() {
|
||||
let mut service = new_service();
|
||||
|
||||
let (ns, ads) = new_network();
|
||||
|
||||
let peer_ids: Vec<_> = ads.by_peer_id.keys().cloned().collect();
|
||||
let authority_ids: Vec<_> = ads.by_peer_id.values().cloned().collect();
|
||||
|
||||
futures::executor::block_on(async move {
|
||||
let req1 = vec![authority_ids[0].clone(), authority_ids[1].clone()];
|
||||
let (sender, mut receiver) = mpsc::channel(2);
|
||||
|
||||
let (_, mut ads) = service.on_request(
|
||||
req1,
|
||||
PeerSet::Validation,
|
||||
sender,
|
||||
ns,
|
||||
ads,
|
||||
).await;
|
||||
|
||||
|
||||
let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[0]).await;
|
||||
service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, maybe_authority).await;
|
||||
let reply1 = receiver.next().await.unwrap();
|
||||
assert_eq!(reply1.0, authority_ids[0]);
|
||||
assert_eq!(reply1.1, peer_ids[0]);
|
||||
|
||||
let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[1]).await;
|
||||
service.on_peer_connected(peer_ids[1].clone(), PeerSet::Validation, maybe_authority).await;
|
||||
let reply2 = receiver.next().await.unwrap();
|
||||
assert_eq!(reply2.0, authority_ids[1]);
|
||||
assert_eq!(reply2.1, peer_ids[1]);
|
||||
});
|
||||
}
|
||||
|
||||
// Test cleanup works.
|
||||
#[test]
|
||||
fn requests_are_removed_on_revoke() {
|
||||
let mut service = new_service();
|
||||
|
||||
let (ns, mut ads) = new_network();
|
||||
let (ns, ads) = new_network();
|
||||
|
||||
let peer_ids: Vec<_> = ads.by_peer_id.keys().cloned().collect();
|
||||
let authority_ids: Vec<_> = ads.by_peer_id.values().cloned().collect();
|
||||
|
||||
futures::executor::block_on(async move {
|
||||
let (sender, mut receiver) = mpsc::channel(1);
|
||||
|
||||
let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[0]).await;
|
||||
service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, maybe_authority).await;
|
||||
let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[1]).await;
|
||||
service.on_peer_connected(peer_ids[1].clone(), PeerSet::Validation, maybe_authority).await;
|
||||
let (keep_alive_handle, keep_alive) = oneshot::channel();
|
||||
|
||||
let (ns, ads) = service.on_request(
|
||||
vec![authority_ids[0].clone()],
|
||||
PeerSet::Validation,
|
||||
sender,
|
||||
keep_alive,
|
||||
ns,
|
||||
ads,
|
||||
).await;
|
||||
|
||||
let _ = receiver.next().await.unwrap();
|
||||
// revoke the request
|
||||
drop(receiver);
|
||||
drop(keep_alive_handle);
|
||||
|
||||
let (sender, mut receiver) = mpsc::channel(1);
|
||||
let (_keep_alive_handle, keep_alive) = oneshot::channel();
|
||||
|
||||
let _ = service.on_request(
|
||||
vec![authority_ids[1].clone()],
|
||||
PeerSet::Validation,
|
||||
sender,
|
||||
keep_alive,
|
||||
ns,
|
||||
ads,
|
||||
).await;
|
||||
|
||||
let reply = receiver.next().await.unwrap();
|
||||
assert_eq!(reply.0, authority_ids[1]);
|
||||
assert_eq!(reply.1, peer_ids[1]);
|
||||
let state = &service.state[PeerSet::Validation];
|
||||
assert_eq!(state.non_revoked_discovery_requests.len(), 1);
|
||||
});
|
||||
@@ -576,104 +377,54 @@ mod tests {
|
||||
fn revoking_requests_with_overlapping_validator_sets() {
|
||||
let mut service = new_service();
|
||||
|
||||
let (ns, mut ads) = new_network();
|
||||
let (ns, ads) = new_network();
|
||||
|
||||
let peer_ids: Vec<_> = ads.by_peer_id.keys().cloned().collect();
|
||||
let authority_ids: Vec<_> = ads.by_peer_id.values().cloned().collect();
|
||||
|
||||
futures::executor::block_on(async move {
|
||||
let (sender, mut receiver) = mpsc::channel(1);
|
||||
|
||||
let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[0]).await;
|
||||
service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, maybe_authority).await;
|
||||
let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[1]).await;
|
||||
service.on_peer_connected(peer_ids[1].clone(), PeerSet::Validation, maybe_authority).await;
|
||||
let (keep_alive_handle, keep_alive) = oneshot::channel();
|
||||
|
||||
let (ns, ads) = service.on_request(
|
||||
vec![authority_ids[0].clone(), authority_ids[2].clone()],
|
||||
PeerSet::Validation,
|
||||
sender,
|
||||
keep_alive,
|
||||
ns,
|
||||
ads,
|
||||
).await;
|
||||
|
||||
let _ = receiver.next().await.unwrap();
|
||||
// revoke the first request
|
||||
drop(receiver);
|
||||
drop(keep_alive_handle);
|
||||
|
||||
let (sender, mut receiver) = mpsc::channel(1);
|
||||
let (keep_alive_handle, keep_alive) = oneshot::channel();
|
||||
|
||||
let (ns, ads) = service.on_request(
|
||||
vec![authority_ids[0].clone(), authority_ids[1].clone()],
|
||||
PeerSet::Validation,
|
||||
sender,
|
||||
keep_alive,
|
||||
ns,
|
||||
ads,
|
||||
).await;
|
||||
|
||||
let _ = receiver.next().await.unwrap();
|
||||
let state = &service.state[PeerSet::Validation];
|
||||
assert_eq!(state.non_revoked_discovery_requests.len(), 1);
|
||||
assert_eq!(ns.peers_set.len(), 2);
|
||||
|
||||
// revoke the second request
|
||||
drop(receiver);
|
||||
drop(keep_alive_handle);
|
||||
|
||||
let (sender, mut receiver) = mpsc::channel(1);
|
||||
let (_keep_alive_handle, keep_alive) = oneshot::channel();
|
||||
|
||||
let (ns, _) = service.on_request(
|
||||
vec![authority_ids[0].clone()],
|
||||
PeerSet::Validation,
|
||||
sender,
|
||||
keep_alive,
|
||||
ns,
|
||||
ads,
|
||||
).await;
|
||||
|
||||
let _ = receiver.next().await.unwrap();
|
||||
let state = &service.state[PeerSet::Validation];
|
||||
assert_eq!(state.non_revoked_discovery_requests.len(), 1);
|
||||
assert_eq!(ns.peers_set.len(), 1);
|
||||
});
|
||||
}
|
||||
|
||||
/// A test for when a validator connects, but the authority discovery not yet knows that the connecting node
|
||||
/// is a validator. This can happen for example at startup of a node.
|
||||
#[test]
|
||||
fn handle_validator_connect_without_authority_discovery_knowing_it() {
|
||||
let mut service = new_service();
|
||||
|
||||
let ns = TestNetwork::default();
|
||||
let mut ads = TestAuthorityDiscovery::default();
|
||||
|
||||
let validator_peer_id = PeerId::random();
|
||||
let validator_id: AuthorityDiscoveryId = Sr25519Keyring::Alice.public().into();
|
||||
|
||||
futures::executor::block_on(async move {
|
||||
let (sender, mut receiver) = mpsc::channel(1);
|
||||
|
||||
let maybe_authority = ads.get_authority_id_by_peer_id(validator_peer_id).await;
|
||||
service.on_peer_connected(validator_peer_id.clone(), PeerSet::Validation, maybe_authority).await;
|
||||
|
||||
let address = known_multiaddr()[0].clone().with(Protocol::P2p(validator_peer_id.clone().into()));
|
||||
ads.by_peer_id.insert(validator_peer_id.clone(), validator_id.clone());
|
||||
ads.by_authority_id.insert(validator_id.clone(), address);
|
||||
|
||||
let _ = service.on_request(
|
||||
vec![validator_id.clone()],
|
||||
PeerSet::Validation,
|
||||
sender,
|
||||
ns,
|
||||
ads,
|
||||
).await;
|
||||
|
||||
assert_eq!((validator_id.clone(), validator_peer_id.clone()), receiver.next().await.unwrap());
|
||||
let state = &service.state[PeerSet::Validation];
|
||||
assert!(
|
||||
state.connected_peers
|
||||
.get(&validator_peer_id)
|
||||
.unwrap()
|
||||
.contains(&validator_id)
|
||||
);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,7 +16,7 @@
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
|
||||
use futures::{FutureExt, channel::oneshot, channel::mpsc};
|
||||
use futures::{FutureExt, channel::oneshot};
|
||||
use sp_core::Pair;
|
||||
|
||||
use polkadot_primitives::v1::{AuthorityDiscoveryId, CandidateHash, CandidateReceipt, CollatorPair, CoreIndex, CoreState, GroupIndex, Hash, Id as ParaId};
|
||||
@@ -218,7 +218,7 @@ struct State {
|
||||
peer_ids: HashMap<PeerId, AuthorityDiscoveryId>,
|
||||
|
||||
/// The connection handles to validators per group we are interested in.
|
||||
connection_handles: HashMap<GroupIndex, mpsc::Receiver<(AuthorityDiscoveryId, PeerId)>>,
|
||||
connection_handles: HashMap<GroupIndex, oneshot::Sender<()>>,
|
||||
|
||||
/// Metrics.
|
||||
metrics: Metrics,
|
||||
@@ -467,13 +467,13 @@ async fn connect_to_validators(
|
||||
state: &mut State,
|
||||
group: GroupValidators,
|
||||
) {
|
||||
let (tx, rx) = mpsc::channel(0);
|
||||
let (keep_alive_handle, keep_alive) = oneshot::channel();
|
||||
// Reconnect in all cases, as authority discovery cache might not have been fully populated
|
||||
// last time:
|
||||
ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::ConnectToValidators {
|
||||
validator_ids: group.validators, peer_set: PeerSet::Collation, connected: tx
|
||||
validator_ids: group.validators, peer_set: PeerSet::Collation, keep_alive
|
||||
})).await;
|
||||
state.connection_handles.insert(group.group, rx);
|
||||
state.connection_handles.insert(group.group, keep_alive_handle);
|
||||
}
|
||||
|
||||
/// Advertise collation to the given `peer`.
|
||||
@@ -917,7 +917,7 @@ mod tests {
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use assert_matches::assert_matches;
|
||||
use futures::{channel::mpsc, executor, future, Future};
|
||||
use futures::{executor, future, Future};
|
||||
|
||||
use sp_core::{crypto::Pair, Decode};
|
||||
use sp_keyring::Sr25519Keyring;
|
||||
@@ -1024,7 +1024,7 @@ mod tests {
|
||||
validators,
|
||||
session_info: SessionInfo {
|
||||
validators: validator_public,
|
||||
discovery_keys,
|
||||
discovery_keys,
|
||||
validator_groups,
|
||||
..Default::default()
|
||||
},
|
||||
@@ -1188,8 +1188,6 @@ mod tests {
|
||||
|
||||
/// Result of [`distribute_collation`]
|
||||
struct DistributeCollation {
|
||||
/// Should be used to inform the subsystem about connected validators.
|
||||
connected: Vec<mpsc::Sender<(AuthorityDiscoveryId, PeerId)>>,
|
||||
candidate: CandidateReceipt,
|
||||
pov_block: PoV,
|
||||
}
|
||||
@@ -1270,32 +1268,26 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
let connected = if should_connect {
|
||||
let connected_current = assert_matches!(
|
||||
if should_connect {
|
||||
assert_matches!(
|
||||
overseer_recv(virtual_overseer).await,
|
||||
AllMessages::NetworkBridge(
|
||||
NetworkBridgeMessage::ConnectToValidators {
|
||||
connected,
|
||||
..
|
||||
}
|
||||
) => { connected }
|
||||
) => {}
|
||||
);
|
||||
let connected_next = assert_matches!(
|
||||
assert_matches!(
|
||||
overseer_recv(virtual_overseer).await,
|
||||
AllMessages::NetworkBridge(
|
||||
NetworkBridgeMessage::ConnectToValidators {
|
||||
connected,
|
||||
..
|
||||
}
|
||||
) => { connected }
|
||||
) => {}
|
||||
);
|
||||
vec![connected_current, connected_next]
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
}
|
||||
|
||||
DistributeCollation {
|
||||
connected,
|
||||
candidate,
|
||||
pov_block,
|
||||
}
|
||||
@@ -1416,7 +1408,7 @@ mod tests {
|
||||
|
||||
setup_system(&mut virtual_overseer, &test_state).await;
|
||||
|
||||
let DistributeCollation { connected: _connected, candidate, pov_block } =
|
||||
let DistributeCollation { candidate, pov_block } =
|
||||
distribute_collation(&mut virtual_overseer, &test_state, true).await;
|
||||
|
||||
for (val, peer) in test_state.current_group_validator_authority_ids()
|
||||
@@ -1500,8 +1492,7 @@ mod tests {
|
||||
|
||||
assert!(overseer_recv_with_timeout(&mut virtual_overseer, TIMEOUT).await.is_none());
|
||||
|
||||
let DistributeCollation { connected: _connected, .. } =
|
||||
distribute_collation(&mut virtual_overseer, &test_state, true).await;
|
||||
distribute_collation(&mut virtual_overseer, &test_state, true).await;
|
||||
|
||||
// Send info about peer's view.
|
||||
overseer_send(
|
||||
@@ -1569,7 +1560,7 @@ mod tests {
|
||||
// And let it tell us that it is has the same view.
|
||||
send_peer_view_change(&mut virtual_overseer, &peer2, vec![test_state.relay_parent]).await;
|
||||
|
||||
let _connected = distribute_collation(&mut virtual_overseer, &test_state, true).await.connected;
|
||||
distribute_collation(&mut virtual_overseer, &test_state, true).await;
|
||||
|
||||
expect_advertise_collation_msg(&mut virtual_overseer, &peer2, test_state.relay_parent).await;
|
||||
|
||||
@@ -1608,14 +1599,14 @@ mod tests {
|
||||
expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await;
|
||||
expect_declare_msg(&mut virtual_overseer, &test_state, &peer2).await;
|
||||
|
||||
let _connected = distribute_collation(&mut virtual_overseer, &test_state, true).await.connected;
|
||||
distribute_collation(&mut virtual_overseer, &test_state, true).await;
|
||||
|
||||
let old_relay_parent = test_state.relay_parent;
|
||||
|
||||
// Advance to a new round, while informing the subsystem that the old and the new relay parent are active.
|
||||
test_state.advance_to_new_round(&mut virtual_overseer, true).await;
|
||||
|
||||
let _connected = distribute_collation(&mut virtual_overseer, &test_state, true).await.connected;
|
||||
distribute_collation(&mut virtual_overseer, &test_state, true).await;
|
||||
|
||||
send_peer_view_change(&mut virtual_overseer, &peer, vec![old_relay_parent]).await;
|
||||
expect_advertise_collation_msg(&mut virtual_overseer, &peer, old_relay_parent).await;
|
||||
@@ -1645,7 +1636,7 @@ mod tests {
|
||||
connect_peer(&mut virtual_overseer, peer.clone(), Some(validator_id.clone())).await;
|
||||
expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await;
|
||||
|
||||
let _ = distribute_collation(&mut virtual_overseer, &test_state, true).await.connected;
|
||||
distribute_collation(&mut virtual_overseer, &test_state, true).await;
|
||||
|
||||
send_peer_view_change(&mut virtual_overseer, &peer, vec![test_state.relay_parent]).await;
|
||||
expect_advertise_collation_msg(&mut virtual_overseer, &peer, test_state.relay_parent).await;
|
||||
|
||||
@@ -18,22 +18,19 @@
|
||||
//! and issuing a connection request to the validators relevant to
|
||||
//! the gossiping subsystems on every new session.
|
||||
|
||||
use futures::{channel::mpsc, FutureExt as _};
|
||||
use futures::{channel::oneshot, FutureExt as _};
|
||||
use polkadot_node_subsystem::{
|
||||
messages::{
|
||||
GossipSupportMessage,
|
||||
AllMessages, GossipSupportMessage, NetworkBridgeMessage,
|
||||
},
|
||||
ActiveLeavesUpdate, FromOverseer, OverseerSignal,
|
||||
Subsystem, SpawnedSubsystem, SubsystemContext,
|
||||
};
|
||||
use polkadot_node_subsystem_util::{
|
||||
validator_discovery,
|
||||
self as util,
|
||||
};
|
||||
use polkadot_node_subsystem_util as util;
|
||||
use polkadot_primitives::v1::{
|
||||
Hash, SessionIndex, AuthorityDiscoveryId,
|
||||
};
|
||||
use polkadot_node_network_protocol::{peer_set::PeerSet, PeerId};
|
||||
use polkadot_node_network_protocol::peer_set::PeerSet;
|
||||
use sp_keystore::{CryptoStore, SyncCryptoStorePtr};
|
||||
use sp_application_crypto::{Public, AppKey};
|
||||
|
||||
@@ -48,7 +45,7 @@ pub struct GossipSupport {
|
||||
struct State {
|
||||
last_session_index: Option<SessionIndex>,
|
||||
/// when we overwrite this, it automatically drops the previous request
|
||||
_last_connection_request: Option<mpsc::Receiver<(AuthorityDiscoveryId, PeerId)>>,
|
||||
_last_connection_request: Option<oneshot::Sender<()>>,
|
||||
}
|
||||
|
||||
impl GossipSupport {
|
||||
@@ -123,6 +120,24 @@ async fn ensure_i_am_an_authority(
|
||||
Err(util::Error::NotAValidator)
|
||||
}
|
||||
|
||||
/// A helper function for making a `ConnectToValidators` request.
|
||||
pub async fn connect_to_authorities(
|
||||
ctx: &mut impl SubsystemContext,
|
||||
validator_ids: Vec<AuthorityDiscoveryId>,
|
||||
peer_set: PeerSet,
|
||||
) -> oneshot::Sender<()> {
|
||||
let (keep_alive_handle, keep_alive) = oneshot::channel();
|
||||
|
||||
ctx.send_message(AllMessages::NetworkBridge(
|
||||
NetworkBridgeMessage::ConnectToValidators {
|
||||
validator_ids,
|
||||
peer_set,
|
||||
keep_alive,
|
||||
}
|
||||
)).await;
|
||||
|
||||
keep_alive_handle
|
||||
}
|
||||
|
||||
impl State {
|
||||
/// 1. Determine if the current session index has changed.
|
||||
@@ -147,14 +162,14 @@ impl State {
|
||||
ensure_i_am_an_authority(keystore, &authorities).await?;
|
||||
tracing::debug!(target: LOG_TARGET, num = ?authorities.len(), "Issuing a connection request");
|
||||
|
||||
let request = validator_discovery::connect_to_authorities(
|
||||
let keep_alive_handle = connect_to_authorities(
|
||||
ctx,
|
||||
authorities,
|
||||
PeerSet::Validation,
|
||||
).await;
|
||||
|
||||
self.last_session_index = Some(new_session);
|
||||
self._last_connection_request = Some(request);
|
||||
self._last_connection_request = Some(keep_alive_handle);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -51,7 +51,6 @@ use std::{
|
||||
use streamunordered::{StreamUnordered, StreamYield};
|
||||
use thiserror::Error;
|
||||
|
||||
pub mod validator_discovery;
|
||||
pub use metered_channel as metered;
|
||||
pub use polkadot_node_network_protocol::MIN_GOSSIP_PEERS;
|
||||
|
||||
|
||||
@@ -1,524 +0,0 @@
|
||||
// Copyright 2020 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Polkadot.
|
||||
|
||||
// Polkadot is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Polkadot is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//! Utility function to make it easier to connect to validators.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::pin::Pin;
|
||||
|
||||
use futures::{
|
||||
channel::mpsc,
|
||||
task::{Poll, self},
|
||||
stream,
|
||||
StreamExt,
|
||||
};
|
||||
use streamunordered::{StreamUnordered, StreamYield};
|
||||
|
||||
use polkadot_node_subsystem::{
|
||||
errors::RuntimeApiError,
|
||||
messages::{AllMessages, NetworkBridgeMessage},
|
||||
SubsystemContext,
|
||||
};
|
||||
use polkadot_primitives::v1::{
|
||||
Hash, ValidatorId, AuthorityDiscoveryId, SessionIndex, Id as ParaId,
|
||||
};
|
||||
use polkadot_node_network_protocol::peer_set::PeerSet;
|
||||
use sc_network::PeerId;
|
||||
use crate::Error;
|
||||
|
||||
/// Utility function to make it easier to connect to validators.
|
||||
pub async fn connect_to_validators<Context: SubsystemContext>(
|
||||
ctx: &mut Context,
|
||||
relay_parent: Hash,
|
||||
validators: Vec<ValidatorId>,
|
||||
peer_set: PeerSet,
|
||||
) -> Result<ConnectionRequest, Error> {
|
||||
let current_index = crate::request_session_index_for_child(relay_parent, ctx.sender()).await.await??;
|
||||
connect_to_validators_in_session(
|
||||
ctx,
|
||||
relay_parent,
|
||||
validators,
|
||||
peer_set,
|
||||
current_index,
|
||||
).await
|
||||
}
|
||||
|
||||
/// Utility function to make it easier to connect to validators in the given session.
|
||||
pub async fn connect_to_validators_in_session<Context: SubsystemContext>(
|
||||
ctx: &mut Context,
|
||||
relay_parent: Hash,
|
||||
validators: Vec<ValidatorId>,
|
||||
peer_set: PeerSet,
|
||||
session_index: SessionIndex,
|
||||
) -> Result<ConnectionRequest, Error> {
|
||||
let session_info = crate::request_session_info(
|
||||
relay_parent,
|
||||
session_index,
|
||||
ctx.sender(),
|
||||
).await.await??;
|
||||
|
||||
let (session_validators, discovery_keys) = match session_info {
|
||||
Some(info) => (info.validators, info.discovery_keys),
|
||||
None => return Err(RuntimeApiError::from(
|
||||
format!("No SessionInfo found for the index {}", session_index)
|
||||
).into()),
|
||||
};
|
||||
|
||||
tracing::trace!(
|
||||
target: "parachain::validator-discovery",
|
||||
validators = ?validators,
|
||||
discovery_keys = ?discovery_keys,
|
||||
session_index,
|
||||
"Trying to serve the validator discovery request",
|
||||
);
|
||||
|
||||
let id_to_index = session_validators.iter()
|
||||
.zip(0usize..)
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
// We assume the same ordering in authorities as in validators so we can do an index search
|
||||
let maybe_authorities: Vec<_> = validators.iter()
|
||||
.map(|id| {
|
||||
let validator_index = id_to_index.get(&id);
|
||||
validator_index.and_then(|i| discovery_keys.get(*i).cloned())
|
||||
})
|
||||
.collect();
|
||||
|
||||
let authorities: Vec<_> = maybe_authorities.iter()
|
||||
.cloned()
|
||||
.filter_map(|id| id)
|
||||
.collect();
|
||||
|
||||
let validator_map = validators.into_iter()
|
||||
.zip(maybe_authorities.into_iter())
|
||||
.filter_map(|(k, v)| v.map(|v| (v, k)))
|
||||
.collect::<HashMap<AuthorityDiscoveryId, ValidatorId>>();
|
||||
|
||||
let connections = connect_to_authorities(ctx, authorities, peer_set).await;
|
||||
|
||||
Ok(ConnectionRequest {
|
||||
validator_map,
|
||||
connections,
|
||||
})
|
||||
}
|
||||
|
||||
/// A helper function for making a `ConnectToValidators` request.
|
||||
pub async fn connect_to_authorities<Context: SubsystemContext>(
|
||||
ctx: &mut Context,
|
||||
validator_ids: Vec<AuthorityDiscoveryId>,
|
||||
peer_set: PeerSet,
|
||||
) -> mpsc::Receiver<(AuthorityDiscoveryId, PeerId)> {
|
||||
const PEERS_CAPACITY: usize = 32;
|
||||
|
||||
let (connected, connected_rx) = mpsc::channel(PEERS_CAPACITY);
|
||||
|
||||
ctx.send_message(AllMessages::NetworkBridge(
|
||||
NetworkBridgeMessage::ConnectToValidators {
|
||||
validator_ids,
|
||||
peer_set,
|
||||
connected,
|
||||
}
|
||||
)).await;
|
||||
|
||||
connected_rx
|
||||
}
|
||||
|
||||
/// Represents a discovered validator.
|
||||
///
|
||||
/// Result of [`ConnectionRequests::next`].
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub struct DiscoveredValidator {
|
||||
/// The relay parent associated with the connection request that returned a result.
|
||||
pub relay_parent: Hash,
|
||||
/// The para ID associated with the connection request that returned a result.
|
||||
pub para_id: ParaId,
|
||||
/// The [`ValidatorId`] that was resolved.
|
||||
pub validator_id: ValidatorId,
|
||||
/// The [`PeerId`] associated to the validator id.
|
||||
pub peer_id: PeerId,
|
||||
}
|
||||
|
||||
/// Used by [`ConnectionRequests::requests`] to map a [`ConnectionRequest`] item to a [`DiscoveredValidator`].
|
||||
struct ConnectionRequestForRelayParentAndParaId {
|
||||
request: ConnectionRequest,
|
||||
relay_parent: Hash,
|
||||
para_id: ParaId,
|
||||
}
|
||||
|
||||
impl stream::Stream for ConnectionRequestForRelayParentAndParaId {
|
||||
type Item = DiscoveredValidator;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Option<Self::Item>> {
|
||||
self.request
|
||||
.poll_next_unpin(cx)
|
||||
.map(|r| r.map(|(validator_id, peer_id)| DiscoveredValidator {
|
||||
validator_id,
|
||||
peer_id,
|
||||
relay_parent: self.relay_parent,
|
||||
para_id: self.para_id,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
/// A struct that assists performing multiple concurrent connection requests.
|
||||
///
|
||||
/// This allows concurrent connections to validator sets at different `(relay_parents, para_id)`.
|
||||
/// Use [`ConnectionRequests::next`] to wait for results of the added connection requests.
|
||||
#[derive(Default)]
|
||||
pub struct ConnectionRequests {
|
||||
/// Connection requests relay_parent -> para_id -> StreamUnordered token
|
||||
///
|
||||
/// Q: Why not (relay_parent, para_id) -> Stream?
|
||||
/// A: So that we can remove from it by relay_parent only.
|
||||
id_map: HashMap<Hash, HashMap<ParaId, usize>>,
|
||||
|
||||
/// Connection requests themselves.
|
||||
requests: StreamUnordered<ConnectionRequestForRelayParentAndParaId>,
|
||||
}
|
||||
|
||||
impl ConnectionRequests {
|
||||
/// Insert a new connection request.
|
||||
///
|
||||
/// If a `ConnectionRequest` under a given `relay_parent` and `para_id` already exists,
|
||||
/// it will be revoked and substituted with the given one.
|
||||
pub fn put(&mut self, relay_parent: Hash, para_id: ParaId, request: ConnectionRequest) {
|
||||
self.remove(&relay_parent, para_id);
|
||||
let token = self.requests.push(ConnectionRequestForRelayParentAndParaId {
|
||||
relay_parent,
|
||||
para_id,
|
||||
request,
|
||||
});
|
||||
|
||||
self.id_map.entry(relay_parent).or_default().insert(para_id, token);
|
||||
}
|
||||
|
||||
/// Remove all connection requests by a given `relay_parent`.
|
||||
pub fn remove_all(&mut self, relay_parent: &Hash) {
|
||||
let map = self.id_map.remove(relay_parent);
|
||||
for token in map.map(|m| m.into_iter().map(|(_, v)| v)).into_iter().flatten() {
|
||||
Pin::new(&mut self.requests).remove(token);
|
||||
}
|
||||
}
|
||||
|
||||
/// Remove a connection request by a given `relay_parent` and `para_id`.
|
||||
pub fn remove(&mut self, relay_parent: &Hash, para_id: ParaId) {
|
||||
if let Some(map) = self.id_map.get_mut(relay_parent) {
|
||||
if let Some(token) = map.remove(¶_id) {
|
||||
Pin::new(&mut self.requests).remove(token);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Is a connection at this relay parent and para_id already present in the request
|
||||
pub fn contains_request(&self, relay_parent: &Hash, para_id: ParaId) -> bool {
|
||||
self.id_map.get(relay_parent).map_or(false, |map| map.contains_key(¶_id))
|
||||
}
|
||||
|
||||
/// Returns the next available connection request result.
|
||||
///
|
||||
/// # Note
|
||||
///
|
||||
/// When there are no active requests this will wait indefinitely, like an always pending future.
|
||||
pub async fn next(&mut self) -> DiscoveredValidator {
|
||||
loop {
|
||||
match self.requests.next().await {
|
||||
Some((StreamYield::Item(item), _)) => {
|
||||
return item
|
||||
},
|
||||
// Ignore finished requests, they are required to be removed.
|
||||
Some((StreamYield::Finished(_), _)) => (),
|
||||
None => futures::pending!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A pending connection request to validators.
|
||||
/// This struct implements `Stream` to allow for asynchronous
|
||||
/// discovery of validator addresses.
|
||||
///
|
||||
/// NOTE: the request will be revoked on drop.
|
||||
#[must_use = "dropping a request will result in its immediate revokation"]
|
||||
pub struct ConnectionRequest {
|
||||
validator_map: HashMap<AuthorityDiscoveryId, ValidatorId>,
|
||||
#[must_use = "streams do nothing unless polled"]
|
||||
connections: mpsc::Receiver<(AuthorityDiscoveryId, PeerId)>,
|
||||
}
|
||||
|
||||
impl stream::Stream for ConnectionRequest {
|
||||
type Item = (ValidatorId, PeerId);
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Option<Self::Item>> {
|
||||
if self.validator_map.is_empty() {
|
||||
return Poll::Ready(None);
|
||||
}
|
||||
match Pin::new(&mut self.connections).poll_next(cx) {
|
||||
Poll::Ready(Some((id, peer_id))) => {
|
||||
if let Some(validator_id) = self.validator_map.remove(&id) {
|
||||
return Poll::Ready(Some((validator_id, peer_id)));
|
||||
} else {
|
||||
// unknown authority_id
|
||||
// should be unreachable
|
||||
}
|
||||
}
|
||||
_ => {},
|
||||
}
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use polkadot_primitives::v1::ValidatorPair;
|
||||
use sp_core::{Pair, Public};
|
||||
|
||||
use futures::{executor, poll, SinkExt};
|
||||
|
||||
async fn check_next_is_pending(connection_requests: &mut ConnectionRequests) {
|
||||
let next = connection_requests.next();
|
||||
futures::pin_mut!(next);
|
||||
assert_eq!(poll!(next), Poll::Pending);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn adding_a_connection_request_works() {
|
||||
let mut connection_requests = ConnectionRequests::default();
|
||||
|
||||
executor::block_on(async move {
|
||||
check_next_is_pending(&mut connection_requests).await;
|
||||
|
||||
let validator_1 = ValidatorPair::generate().0.public();
|
||||
let validator_2 = ValidatorPair::generate().0.public();
|
||||
|
||||
let auth_1 = AuthorityDiscoveryId::from_slice(&[1; 32]);
|
||||
let auth_2 = AuthorityDiscoveryId::from_slice(&[2; 32]);
|
||||
|
||||
let mut validator_map = HashMap::new();
|
||||
validator_map.insert(auth_1.clone(), validator_1.clone());
|
||||
validator_map.insert(auth_2.clone(), validator_2.clone());
|
||||
|
||||
let (mut rq1_tx, rq1_rx) = mpsc::channel(8);
|
||||
|
||||
let peer_id_1 = PeerId::random();
|
||||
let peer_id_2 = PeerId::random();
|
||||
|
||||
let connection_request_1 = ConnectionRequest {
|
||||
validator_map,
|
||||
connections: rq1_rx,
|
||||
};
|
||||
|
||||
let relay_parent_1 = Hash::repeat_byte(1);
|
||||
let para_id = ParaId::from(3);
|
||||
connection_requests.put(relay_parent_1.clone(), para_id, connection_request_1);
|
||||
|
||||
rq1_tx.send((auth_1, peer_id_1.clone())).await.unwrap();
|
||||
rq1_tx.send((auth_2, peer_id_2.clone())).await.unwrap();
|
||||
|
||||
let res = connection_requests.next().await;
|
||||
assert_eq!(
|
||||
res,
|
||||
DiscoveredValidator { relay_parent: relay_parent_1, para_id, validator_id: validator_1, peer_id: peer_id_1 },
|
||||
);
|
||||
|
||||
let res = connection_requests.next().await;
|
||||
assert_eq!(
|
||||
res,
|
||||
DiscoveredValidator { relay_parent: relay_parent_1, para_id, validator_id: validator_2, peer_id: peer_id_2 },
|
||||
);
|
||||
|
||||
check_next_is_pending(&mut connection_requests).await;
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn adding_two_connection_requests_works() {
|
||||
let mut connection_requests = ConnectionRequests::default();
|
||||
|
||||
executor::block_on(async move {
|
||||
check_next_is_pending(&mut connection_requests).await;
|
||||
|
||||
let validator_1 = ValidatorPair::generate().0.public();
|
||||
let validator_2 = ValidatorPair::generate().0.public();
|
||||
|
||||
let auth_1 = AuthorityDiscoveryId::from_slice(&[1; 32]);
|
||||
let auth_2 = AuthorityDiscoveryId::from_slice(&[2; 32]);
|
||||
|
||||
let mut validator_map_1 = HashMap::new();
|
||||
let mut validator_map_2 = HashMap::new();
|
||||
|
||||
validator_map_1.insert(auth_1.clone(), validator_1.clone());
|
||||
validator_map_2.insert(auth_2.clone(), validator_2.clone());
|
||||
|
||||
let (mut rq1_tx, rq1_rx) = mpsc::channel(8);
|
||||
|
||||
let (mut rq2_tx, rq2_rx) = mpsc::channel(8);
|
||||
|
||||
let peer_id_1 = PeerId::random();
|
||||
let peer_id_2 = PeerId::random();
|
||||
|
||||
let connection_request_1 = ConnectionRequest {
|
||||
validator_map: validator_map_1,
|
||||
connections: rq1_rx,
|
||||
};
|
||||
|
||||
let connection_request_2 = ConnectionRequest {
|
||||
validator_map: validator_map_2,
|
||||
connections: rq2_rx,
|
||||
};
|
||||
|
||||
let relay_parent_1 = Hash::repeat_byte(1);
|
||||
let relay_parent_2 = Hash::repeat_byte(2);
|
||||
let para_id = ParaId::from(3);
|
||||
|
||||
connection_requests.put(relay_parent_1.clone(), para_id, connection_request_1);
|
||||
connection_requests.put(relay_parent_2.clone(), para_id, connection_request_2);
|
||||
|
||||
rq1_tx.send((auth_1, peer_id_1.clone())).await.unwrap();
|
||||
rq2_tx.send((auth_2, peer_id_2.clone())).await.unwrap();
|
||||
|
||||
let res = connection_requests.next().await;
|
||||
assert_eq!(
|
||||
res,
|
||||
DiscoveredValidator { relay_parent: relay_parent_1, para_id, validator_id: validator_1, peer_id: peer_id_1 },
|
||||
);
|
||||
|
||||
let res = connection_requests.next().await;
|
||||
assert_eq!(
|
||||
res,
|
||||
DiscoveredValidator { relay_parent: relay_parent_2, para_id, validator_id: validator_2, peer_id: peer_id_2 },
|
||||
);
|
||||
|
||||
check_next_is_pending(&mut connection_requests).await;
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn same_relay_parent_diffent_para_ids() {
|
||||
let mut connection_requests = ConnectionRequests::default();
|
||||
|
||||
executor::block_on(async move {
|
||||
check_next_is_pending(&mut connection_requests).await;
|
||||
|
||||
let validator_1 = ValidatorPair::generate().0.public();
|
||||
let validator_2 = ValidatorPair::generate().0.public();
|
||||
|
||||
let auth_1 = AuthorityDiscoveryId::from_slice(&[1; 32]);
|
||||
let auth_2 = AuthorityDiscoveryId::from_slice(&[2; 32]);
|
||||
|
||||
let mut validator_map_1 = HashMap::new();
|
||||
let mut validator_map_2 = HashMap::new();
|
||||
|
||||
validator_map_1.insert(auth_1.clone(), validator_1.clone());
|
||||
validator_map_2.insert(auth_2.clone(), validator_2.clone());
|
||||
|
||||
let (mut rq1_tx, rq1_rx) = mpsc::channel(8);
|
||||
let (mut rq2_tx, rq2_rx) = mpsc::channel(8);
|
||||
|
||||
let peer_id_1 = PeerId::random();
|
||||
let peer_id_2 = PeerId::random();
|
||||
|
||||
let connection_request_1 = ConnectionRequest {
|
||||
validator_map: validator_map_1,
|
||||
connections: rq1_rx,
|
||||
};
|
||||
|
||||
let connection_request_2 = ConnectionRequest {
|
||||
validator_map: validator_map_2,
|
||||
connections: rq2_rx,
|
||||
};
|
||||
|
||||
let relay_parent = Hash::repeat_byte(1);
|
||||
let para_id_1 = ParaId::from(1);
|
||||
let para_id_2 = ParaId::from(2);
|
||||
|
||||
connection_requests.put(relay_parent.clone(), para_id_1, connection_request_1);
|
||||
connection_requests.put(relay_parent.clone(), para_id_2, connection_request_2);
|
||||
|
||||
rq1_tx.send((auth_1, peer_id_1.clone())).await.unwrap();
|
||||
rq2_tx.send((auth_2, peer_id_2.clone())).await.unwrap();
|
||||
|
||||
connection_requests.remove(&relay_parent, para_id_1);
|
||||
|
||||
let res = connection_requests.next().await;
|
||||
assert_eq!(
|
||||
res,
|
||||
DiscoveredValidator { relay_parent, para_id: para_id_2, validator_id: validator_2, peer_id: peer_id_2 },
|
||||
);
|
||||
|
||||
check_next_is_pending(&mut connection_requests).await;
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn replacing_a_connection_request_works() {
|
||||
let mut connection_requests = ConnectionRequests::default();
|
||||
|
||||
executor::block_on(async move {
|
||||
check_next_is_pending(&mut connection_requests).await;
|
||||
|
||||
let validator_1 = ValidatorPair::generate().0.public();
|
||||
let validator_2 = ValidatorPair::generate().0.public();
|
||||
|
||||
let auth_1 = AuthorityDiscoveryId::from_slice(&[1; 32]);
|
||||
let auth_2 = AuthorityDiscoveryId::from_slice(&[2; 32]);
|
||||
|
||||
let mut validator_map_1 = HashMap::new();
|
||||
let mut validator_map_2 = HashMap::new();
|
||||
|
||||
validator_map_1.insert(auth_1.clone(), validator_1.clone());
|
||||
validator_map_2.insert(auth_2.clone(), validator_2.clone());
|
||||
|
||||
let (mut rq1_tx, rq1_rx) = mpsc::channel(8);
|
||||
|
||||
let (mut rq2_tx, rq2_rx) = mpsc::channel(8);
|
||||
|
||||
let peer_id_1 = PeerId::random();
|
||||
let peer_id_2 = PeerId::random();
|
||||
|
||||
let connection_request_1 = ConnectionRequest {
|
||||
validator_map: validator_map_1,
|
||||
connections: rq1_rx,
|
||||
};
|
||||
|
||||
let connection_request_2 = ConnectionRequest {
|
||||
validator_map: validator_map_2,
|
||||
connections: rq2_rx,
|
||||
};
|
||||
|
||||
let relay_parent = Hash::repeat_byte(3);
|
||||
let para_id = ParaId::from(3);
|
||||
|
||||
connection_requests.put(relay_parent.clone(), para_id, connection_request_1);
|
||||
|
||||
rq1_tx.send((auth_1.clone(), peer_id_1.clone())).await.unwrap();
|
||||
|
||||
let res = connection_requests.next().await;
|
||||
assert_eq!(res, DiscoveredValidator { relay_parent, para_id, validator_id: validator_1, peer_id: peer_id_1.clone() });
|
||||
|
||||
connection_requests.put(relay_parent.clone(), para_id, connection_request_2);
|
||||
|
||||
assert!(rq1_tx.send((auth_1, peer_id_1.clone())).await.is_err());
|
||||
|
||||
rq2_tx.send((auth_2, peer_id_2.clone())).await.unwrap();
|
||||
|
||||
let res = connection_requests.next().await;
|
||||
assert_eq!(res, DiscoveredValidator { relay_parent, para_id, validator_id: validator_2, peer_id: peer_id_2 });
|
||||
|
||||
check_next_is_pending(&mut connection_requests).await;
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -241,16 +241,17 @@ pub enum NetworkBridgeMessage {
|
||||
///
|
||||
/// Also ask the network to stay connected to these peers at least
|
||||
/// until the request is revoked.
|
||||
/// This can be done by dropping the receiver.
|
||||
///
|
||||
/// A caller can learn about validator connections by listening to the
|
||||
/// `PeerConnected` events from the network bridge.
|
||||
ConnectToValidators {
|
||||
/// Ids of the validators to connect to.
|
||||
validator_ids: Vec<AuthorityDiscoveryId>,
|
||||
/// The underlying protocol to use for this request.
|
||||
peer_set: PeerSet,
|
||||
/// Response sender by which the issuer can learn the `PeerId`s of
|
||||
/// the validators as they are connected.
|
||||
/// The response is sent immediately for already connected peers.
|
||||
connected: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>,
|
||||
/// A request is revoked by dropping the `keep_alive` sender.
|
||||
/// The revokation takes place upon the next connection request.
|
||||
keep_alive: oneshot::Receiver<()>,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user