mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-13 04:41:02 +00:00
Infrastructure improvements (#2897)
* Factor out runtime module into utils. * Add maybe_authority information to `PeerConnected` event. We already gather this information in authority discovery, so we might as well share it with others. This opens up an easy path to trigger validators differently from normal nodes, e.g. for prioritization. This change has become more important now, that we just connect to all validators and therefore just have a long peer list without any information about those nodes. * Test fix.
This commit is contained in:
@@ -36,7 +36,7 @@ use polkadot_subsystem::messages::{
|
||||
NetworkBridgeMessage, AllMessages,
|
||||
CollatorProtocolMessage, NetworkBridgeEvent,
|
||||
};
|
||||
use polkadot_primitives::v1::{Hash, BlockNumber};
|
||||
use polkadot_primitives::v1::{Hash, BlockNumber, AuthorityDiscoveryId};
|
||||
use polkadot_node_network_protocol::{
|
||||
PeerId, peer_set::PeerSet, View, v1 as protocol_v1, OurView, UnifiedReputationChange as Rep,
|
||||
ObservedRole,
|
||||
@@ -316,7 +316,7 @@ impl From<SubsystemError> for UnexpectedAbort {
|
||||
|
||||
// notifications to be passed through to the validator discovery worker.
|
||||
enum ValidatorDiscoveryNotification {
|
||||
PeerConnected(PeerId, PeerSet),
|
||||
PeerConnected(PeerId, PeerSet, Option<AuthorityDiscoveryId>),
|
||||
PeerDisconnected(PeerId, PeerSet),
|
||||
}
|
||||
|
||||
@@ -540,11 +540,11 @@ where
|
||||
},
|
||||
notification = validator_discovery_notifications.next().fuse() => match notification {
|
||||
None => return Ok(()),
|
||||
Some(ValidatorDiscoveryNotification::PeerConnected(peer, peer_set)) => {
|
||||
Some(ValidatorDiscoveryNotification::PeerConnected(peer, peer_set, maybe_auth)) => {
|
||||
validator_discovery.on_peer_connected(
|
||||
peer.clone(),
|
||||
peer_set,
|
||||
&mut authority_discovery_service,
|
||||
maybe_auth,
|
||||
).await;
|
||||
}
|
||||
Some(ValidatorDiscoveryNotification::PeerDisconnected(peer, peer_set)) => {
|
||||
@@ -555,9 +555,10 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_network_messages(
|
||||
async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
|
||||
mut sender: impl SubsystemSender,
|
||||
mut network_service: impl Network,
|
||||
mut authority_discovery_service: AD,
|
||||
mut request_multiplexer: RequestMultiplexer,
|
||||
mut validator_discovery_notifications: mpsc::Sender<ValidatorDiscoveryNotification>,
|
||||
metrics: Metrics,
|
||||
@@ -607,10 +608,14 @@ async fn handle_network_messages(
|
||||
shared.local_view.clone().unwrap_or(View::default())
|
||||
};
|
||||
|
||||
let maybe_authority =
|
||||
authority_discovery_service
|
||||
.get_authority_id_by_peer_id(peer).await;
|
||||
|
||||
// Failure here means that the other side of the network bridge
|
||||
// has concluded and this future will be dropped in due course.
|
||||
let _ = validator_discovery_notifications.send(
|
||||
ValidatorDiscoveryNotification::PeerConnected(peer.clone(), peer_set)
|
||||
ValidatorDiscoveryNotification::PeerConnected(peer, peer_set, maybe_authority.clone())
|
||||
).await;
|
||||
|
||||
|
||||
@@ -618,7 +623,7 @@ async fn handle_network_messages(
|
||||
PeerSet::Validation => {
|
||||
dispatch_validation_events_to_all(
|
||||
vec![
|
||||
NetworkBridgeEvent::PeerConnected(peer.clone(), role),
|
||||
NetworkBridgeEvent::PeerConnected(peer.clone(), role, maybe_authority),
|
||||
NetworkBridgeEvent::PeerViewChange(
|
||||
peer.clone(),
|
||||
View::default(),
|
||||
@@ -640,7 +645,7 @@ async fn handle_network_messages(
|
||||
PeerSet::Collation => {
|
||||
dispatch_collation_events_to_all(
|
||||
vec![
|
||||
NetworkBridgeEvent::PeerConnected(peer.clone(), role),
|
||||
NetworkBridgeEvent::PeerConnected(peer.clone(), role, maybe_authority),
|
||||
NetworkBridgeEvent::PeerViewChange(
|
||||
peer.clone(),
|
||||
View::default(),
|
||||
@@ -858,6 +863,7 @@ where
|
||||
let (remote, network_event_handler) = handle_network_messages(
|
||||
ctx.sender().clone(),
|
||||
network_service.clone(),
|
||||
authority_discovery_service.clone(),
|
||||
request_multiplexer,
|
||||
validation_worker_tx,
|
||||
metrics.clone(),
|
||||
@@ -1191,6 +1197,7 @@ mod tests {
|
||||
_req_configs: Vec<RequestResponseConfig>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct TestAuthorityDiscovery;
|
||||
|
||||
// The test's view of the network. This receives updates from the subsystem in the form
|
||||
@@ -1796,7 +1803,7 @@ mod tests {
|
||||
// bridge will inform about all connected peers.
|
||||
{
|
||||
assert_sends_validation_event_to_all(
|
||||
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full),
|
||||
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None),
|
||||
&mut virtual_overseer,
|
||||
).await;
|
||||
|
||||
@@ -1848,7 +1855,7 @@ mod tests {
|
||||
// bridge will inform about all connected peers.
|
||||
{
|
||||
assert_sends_validation_event_to_all(
|
||||
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full),
|
||||
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None),
|
||||
&mut virtual_overseer,
|
||||
).await;
|
||||
|
||||
@@ -1920,7 +1927,7 @@ mod tests {
|
||||
// bridge will inform about all connected peers.
|
||||
{
|
||||
assert_sends_validation_event_to_all(
|
||||
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full),
|
||||
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None),
|
||||
&mut virtual_overseer,
|
||||
).await;
|
||||
|
||||
@@ -1932,7 +1939,7 @@ mod tests {
|
||||
|
||||
{
|
||||
assert_sends_collation_event_to_all(
|
||||
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full),
|
||||
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None),
|
||||
&mut virtual_overseer,
|
||||
).await;
|
||||
|
||||
@@ -2004,7 +2011,7 @@ mod tests {
|
||||
// bridge will inform about all connected peers.
|
||||
{
|
||||
assert_sends_validation_event_to_all(
|
||||
NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full),
|
||||
NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full, None),
|
||||
&mut virtual_overseer,
|
||||
).await;
|
||||
|
||||
@@ -2016,7 +2023,7 @@ mod tests {
|
||||
|
||||
{
|
||||
assert_sends_collation_event_to_all(
|
||||
NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full),
|
||||
NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full, None),
|
||||
&mut virtual_overseer,
|
||||
).await;
|
||||
|
||||
@@ -2099,7 +2106,7 @@ mod tests {
|
||||
// bridge will inform about all connected peers.
|
||||
{
|
||||
assert_sends_validation_event_to_all(
|
||||
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full),
|
||||
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None),
|
||||
&mut virtual_overseer,
|
||||
).await;
|
||||
|
||||
@@ -2111,7 +2118,7 @@ mod tests {
|
||||
|
||||
{
|
||||
assert_sends_collation_event_to_all(
|
||||
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full),
|
||||
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None),
|
||||
&mut virtual_overseer,
|
||||
).await;
|
||||
|
||||
@@ -2259,7 +2266,7 @@ mod tests {
|
||||
// bridge will inform about all connected peers.
|
||||
{
|
||||
assert_sends_validation_event_to_all(
|
||||
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full),
|
||||
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None),
|
||||
&mut virtual_overseer,
|
||||
).await;
|
||||
|
||||
@@ -2271,7 +2278,7 @@ mod tests {
|
||||
|
||||
{
|
||||
assert_sends_collation_event_to_all(
|
||||
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full),
|
||||
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None),
|
||||
&mut virtual_overseer,
|
||||
).await;
|
||||
|
||||
|
||||
@@ -34,7 +34,7 @@ const LOG_TARGET: &str = "parachain::validator-discovery";
|
||||
|
||||
/// An abstraction over the authority discovery service.
|
||||
#[async_trait]
|
||||
pub trait AuthorityDiscovery: Send + 'static {
|
||||
pub trait AuthorityDiscovery: Send + Clone + 'static {
|
||||
/// Get the addresses for the given [`AuthorityId`] from the local address cache.
|
||||
async fn get_addresses_by_authority_id(&mut self, authority: AuthorityDiscoveryId) -> Option<Vec<Multiaddr>>;
|
||||
/// Get the [`AuthorityId`] for the given [`PeerId`] from the local address cache.
|
||||
@@ -307,16 +307,15 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
|
||||
}
|
||||
|
||||
/// Should be called when a peer connected.
|
||||
#[tracing::instrument(level = "trace", skip(self, authority_discovery_service), fields(subsystem = LOG_TARGET))]
|
||||
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
|
||||
pub async fn on_peer_connected(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
peer_set: PeerSet,
|
||||
authority_discovery_service: &mut AD,
|
||||
maybe_authority: Option<AuthorityDiscoveryId>,
|
||||
) {
|
||||
let state = &mut self.state[peer_set];
|
||||
// check if it's an authority we've been waiting for
|
||||
let maybe_authority = authority_discovery_service.get_authority_id_by_peer_id(peer_id.clone()).await;
|
||||
if let Some(authority) = maybe_authority {
|
||||
for request in state.non_revoked_discovery_requests.iter_mut() {
|
||||
let _ = request.on_authority_connected(&authority, &peer_id);
|
||||
@@ -359,7 +358,7 @@ mod tests {
|
||||
peers_set: HashSet<Multiaddr>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
#[derive(Default, Clone)]
|
||||
struct TestAuthorityDiscovery {
|
||||
by_authority_id: HashMap<AuthorityDiscoveryId, Multiaddr>,
|
||||
by_peer_id: HashMap<PeerId, AuthorityDiscoveryId>,
|
||||
@@ -469,7 +468,8 @@ mod tests {
|
||||
let req1 = vec![authority_ids[0].clone(), authority_ids[1].clone()];
|
||||
let (sender, mut receiver) = mpsc::channel(2);
|
||||
|
||||
service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, &mut ads).await;
|
||||
let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[0]).await;
|
||||
service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, maybe_authority).await;
|
||||
|
||||
let _ = service.on_request(
|
||||
req1,
|
||||
@@ -509,12 +509,14 @@ mod tests {
|
||||
).await;
|
||||
|
||||
|
||||
service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, &mut ads).await;
|
||||
let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[0]).await;
|
||||
service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, maybe_authority).await;
|
||||
let reply1 = receiver.next().await.unwrap();
|
||||
assert_eq!(reply1.0, authority_ids[0]);
|
||||
assert_eq!(reply1.1, peer_ids[0]);
|
||||
|
||||
service.on_peer_connected(peer_ids[1].clone(), PeerSet::Validation, &mut ads).await;
|
||||
let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[1]).await;
|
||||
service.on_peer_connected(peer_ids[1].clone(), PeerSet::Validation, maybe_authority).await;
|
||||
let reply2 = receiver.next().await.unwrap();
|
||||
assert_eq!(reply2.0, authority_ids[1]);
|
||||
assert_eq!(reply2.1, peer_ids[1]);
|
||||
@@ -534,8 +536,10 @@ mod tests {
|
||||
futures::executor::block_on(async move {
|
||||
let (sender, mut receiver) = mpsc::channel(1);
|
||||
|
||||
service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, &mut ads).await;
|
||||
service.on_peer_connected(peer_ids[1].clone(), PeerSet::Validation, &mut ads).await;
|
||||
let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[0]).await;
|
||||
service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, maybe_authority).await;
|
||||
let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[1]).await;
|
||||
service.on_peer_connected(peer_ids[1].clone(), PeerSet::Validation, maybe_authority).await;
|
||||
|
||||
let (ns, ads) = service.on_request(
|
||||
vec![authority_ids[0].clone()],
|
||||
@@ -580,8 +584,10 @@ mod tests {
|
||||
futures::executor::block_on(async move {
|
||||
let (sender, mut receiver) = mpsc::channel(1);
|
||||
|
||||
service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, &mut ads).await;
|
||||
service.on_peer_connected(peer_ids[1].clone(), PeerSet::Validation, &mut ads).await;
|
||||
let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[0]).await;
|
||||
service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, maybe_authority).await;
|
||||
let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[1]).await;
|
||||
service.on_peer_connected(peer_ids[1].clone(), PeerSet::Validation, maybe_authority).await;
|
||||
|
||||
let (ns, ads) = service.on_request(
|
||||
vec![authority_ids[0].clone(), authority_ids[2].clone()],
|
||||
@@ -645,7 +651,8 @@ mod tests {
|
||||
futures::executor::block_on(async move {
|
||||
let (sender, mut receiver) = mpsc::channel(1);
|
||||
|
||||
service.on_peer_connected(validator_peer_id.clone(), PeerSet::Validation, &mut ads).await;
|
||||
let maybe_authority = ads.get_authority_id_by_peer_id(validator_peer_id).await;
|
||||
service.on_peer_connected(validator_peer_id.clone(), PeerSet::Validation, maybe_authority).await;
|
||||
|
||||
let address = known_multiaddr()[0].clone().with(Protocol::P2p(validator_peer_id.clone().into()));
|
||||
ads.by_peer_id.insert(validator_peer_id.clone(), validator_id.clone());
|
||||
|
||||
Reference in New Issue
Block a user