Add NetworkBridgeEvent::UpdatedAuthorityIds (#6227)

* Add NetworkBridgeEvent::UpdatedAuthorityIds

* update collator

* informing subsystems

* remove outdated authority data

* docs

* remove accidentally added line

* update docs

* emit event on session change

* emit

* check for update and update

* Update node/network/gossip-support/src/lib.rs

Co-authored-by: Andronik <write@reusable.software>

* Update node/network/approval-distribution/src/lib.rs

Co-authored-by: Andronik <write@reusable.software>

* Update node/network/bitfield-distribution/src/lib.rs

Co-authored-by: Andronik <write@reusable.software>

* Update node/network/bridge/src/rx/mod.rs

Co-authored-by: Andronik <write@reusable.software>

* Update node/network/collator-protocol/src/validator_side/mod.rs

Co-authored-by: Andronik <write@reusable.software>

* Update node/network/collator-protocol/src/collator_side/mod.rs

Co-authored-by: Andronik <write@reusable.software>

* Update node/network/bridge/src/rx/mod.rs

Co-authored-by: Andronik <write@reusable.software>

* Update node/subsystem-types/src/messages.rs

Co-authored-by: Andronik <write@reusable.software>

* Update node/subsystem-types/src/messages.rs

Co-authored-by: Andronik <write@reusable.software>

* Update node/network/statement-distribution/src/lib.rs

Co-authored-by: Andronik <write@reusable.software>

* Update node/network/statement-distribution/src/lib.rs

Co-authored-by: Andronik <write@reusable.software>

* Update node/subsystem-types/src/messages/network_bridge_event.rs

Co-authored-by: Andronik <write@reusable.software>

* Update node/subsystem-types/src/messages.rs

Co-authored-by: Andronik <write@reusable.software>

* fixes

* merge fixes

* make clippy happy again

* fix

---------

Co-authored-by: Andronik <write@reusable.software>
This commit is contained in:
Sergej Sakac
2023-07-21 11:48:15 -07:00
committed by GitHub
parent a200d4c9d7
commit ac253c7139
11 changed files with 123 additions and 3 deletions
+1
View File
@@ -7175,6 +7175,7 @@ dependencies = [
"rand 0.8.5",
"rand_chacha 0.3.1",
"sc-network",
"sc-network-common",
"sp-application-crypto",
"sp-authority-discovery",
"sp-consensus-babe",
@@ -389,6 +389,9 @@ impl State {
live
});
},
NetworkBridgeEvent::UpdatedAuthorityIds { .. } => {
// The approval-distribution subsystem doesn't deal with `AuthorityDiscoveryId`s.
},
NetworkBridgeEvent::PeerMessage(peer_id, Versioned::V1(msg)) => {
self.process_incoming_peer_message(ctx, metrics, peer_id, msg, rng).await;
},
@@ -698,6 +698,9 @@ async fn handle_network_msg<Context>(
},
NetworkBridgeEvent::PeerMessage(remote, Versioned::V1(message)) =>
process_incoming_peer_message(ctx, state, metrics, remote, message, rng).await,
NetworkBridgeEvent::UpdatedAuthorityIds { .. } => {
// The bitfield-distribution subsystem doesn't deal with `AuthorityDiscoveryId`s.
},
}
}
+22 -1
View File
@@ -371,7 +371,7 @@ where
Ok(v) => v,
};
// non-decoded, but version-checked colldation messages.
// non-decoded, but version-checked collation messages.
let c_messages: Result<Vec<_>, _> = messages
.iter()
.filter_map(|(protocol, msg_bytes)| {
@@ -555,6 +555,27 @@ where
ctx.sender(),
);
},
FromOrchestra::Communication {
msg: NetworkBridgeRxMessage::UpdatedAuthorityIds { peer_id, authority_ids },
} => {
gum::debug!(
target: LOG_TARGET,
action = "UpdatedAuthorityIds",
?peer_id,
?authority_ids,
"`AuthorityDiscoveryId`s have changed",
);
// using unbounded send to avoid cycles
// the messages are sent only once per session up to one per peer
dispatch_collation_event_to_all_unbounded(
NetworkBridgeEvent::UpdatedAuthorityIds(peer_id, authority_ids.clone()),
ctx.sender(),
);
dispatch_validation_event_to_all_unbounded(
NetworkBridgeEvent::UpdatedAuthorityIds(peer_id, authority_ids),
ctx.sender(),
);
},
FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
FromOrchestra::Signal(OverseerSignal::ActiveLeaves(active_leaves)) => {
let ActiveLeavesUpdate { activated, deactivated } = active_leaves;
@@ -907,6 +907,10 @@ async fn handle_network_msg<Context>(
PeerMessage(remote, Versioned::V1(msg)) => {
handle_incoming_peer_message(ctx, runtime, state, remote, msg).await?;
},
UpdatedAuthorityIds(peer_id, authority_ids) => {
gum::trace!(target: LOG_TARGET, ?peer_id, ?authority_ids, "Updated authority ids");
state.peer_ids.insert(peer_id, authority_ids);
},
NewGossipTopology { .. } => {
// impossible!
},
@@ -1131,6 +1131,9 @@ async fn handle_network_msg<Context>(
PeerMessage(remote, Versioned::V1(msg)) => {
process_incoming_peer_message(ctx, state, remote, msg).await;
},
UpdatedAuthorityIds { .. } => {
// The validator side doesn't deal with `AuthorityDiscoveryId`s.
},
}
Ok(())
@@ -9,6 +9,7 @@ sp-application-crypto = { git = "https://github.com/paritytech/substrate", branc
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-network-common = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-node-network-protocol = { path = "../protocol" }
polkadot-node-subsystem = { path = "../../subsystem" }
@@ -35,7 +35,7 @@ use futures_timer::Delay;
use rand::{seq::SliceRandom as _, SeedableRng};
use rand_chacha::ChaCha20Rng;
use sc_network::Multiaddr;
use sc_network::{config::parse_addr, Multiaddr};
use sp_application_crypto::{AppCrypto, ByteArray};
use sp_keystore::{Keystore, KeystorePtr};
@@ -265,11 +265,13 @@ where
update_gossip_topology(
sender,
our_index,
session_info.discovery_keys,
session_info.discovery_keys.clone(),
relay_parent,
session_index,
)
.await?;
self.update_authority_ids(sender, session_info.discovery_keys).await;
}
}
}
@@ -383,6 +385,45 @@ where
};
}
async fn update_authority_ids<Sender>(
&mut self,
sender: &mut Sender,
authorities: Vec<AuthorityDiscoveryId>,
) where
Sender: overseer::GossipSupportSenderTrait,
{
let mut authority_ids: HashMap<PeerId, HashSet<AuthorityDiscoveryId>> = HashMap::new();
for authority in authorities {
let peer_id = self
.authority_discovery
.get_addresses_by_authority_id(authority.clone())
.await
.into_iter()
.flat_map(|list| list.into_iter())
.find_map(|addr| parse_addr(addr).ok().map(|(p, _)| p));
if let Some(p) = peer_id {
authority_ids.entry(p).or_default().insert(authority);
}
}
for (peer_id, auths) in authority_ids {
if self.connected_authorities_by_peer_id.get(&peer_id) != Some(&auths) {
sender
.send_message(NetworkBridgeRxMessage::UpdatedAuthorityIds {
peer_id,
authority_ids: auths.clone(),
})
.await;
auths.iter().for_each(|a| {
self.connected_authorities.insert(a.clone(), peer_id);
});
self.connected_authorities_by_peer_id.insert(peer_id, auths);
}
}
}
fn handle_connect_disconnect(&mut self, ev: NetworkBridgeEvent<GossipSupportNetworkMessage>) {
match ev {
NetworkBridgeEvent::PeerConnected(peer_id, _, _, o_authority) => {
@@ -401,6 +442,9 @@ where
});
}
},
NetworkBridgeEvent::UpdatedAuthorityIds(_, _) => {
// The `gossip-support` subsystem itself issues these messages.
},
NetworkBridgeEvent::OurViewChange(_) => {},
NetworkBridgeEvent::PeerViewChange(_, _) => {},
NetworkBridgeEvent::NewGossipTopology { .. } => {},
@@ -1754,6 +1754,34 @@ async fn handle_network_update<Context, R>(
NetworkBridgeEvent::OurViewChange(_view) => {
// handled by `ActiveLeavesUpdate`
},
NetworkBridgeEvent::UpdatedAuthorityIds(peer, authority_ids) => {
gum::trace!(
target: LOG_TARGET,
?peer,
?authority_ids,
"Updated `AuthorityDiscoveryId`s"
);
// get the outdated authority_ids stored for the specific peer_id.
let old_auth_ids: Vec<AuthorityDiscoveryId> = authorities
.into_iter()
.filter(|(_, p)| **p == peer)
.map(|(auth, _)| auth.clone())
.collect();
// remove all of the outdated authority_ids.
for auth in old_auth_ids {
authorities.remove(&auth);
}
// update `authorities` with the new updated data.
authority_ids.clone().into_iter().for_each(|a| {
authorities.insert(a, peer);
});
if let Some(data) = peers.get_mut(&peer) {
data.maybe_authority = Some(authority_ids);
}
},
}
}
@@ -302,6 +302,13 @@ pub enum NetworkBridgeRxMessage {
/// to the index in `canonical_shuffling`
shuffled_indices: Vec<usize>,
},
/// Inform the distribution subsystems about `AuthorityDiscoveryId` key rotations.
UpdatedAuthorityIds {
/// The `PeerId` of the peer that updated its `AuthorityDiscoveryId`s.
peer_id: PeerId,
/// The updated authority discovery keys of the peer.
authority_ids: HashSet<AuthorityDiscoveryId>,
},
}
/// Type of peer reporting
@@ -61,6 +61,9 @@ pub enum NetworkBridgeEvent<M> {
/// Our view has changed.
OurViewChange(OurView),
/// The authority discovery session key has been rotated.
UpdatedAuthorityIds(PeerId, HashSet<AuthorityDiscoveryId>),
}
impl<M> NetworkBridgeEvent<M> {
@@ -101,6 +104,8 @@ impl<M> NetworkBridgeEvent<M> {
NetworkBridgeEvent::PeerViewChange(*peer, view.clone()),
NetworkBridgeEvent::OurViewChange(ref view) =>
NetworkBridgeEvent::OurViewChange(view.clone()),
NetworkBridgeEvent::UpdatedAuthorityIds(ref peer, ref authority_ids) =>
NetworkBridgeEvent::UpdatedAuthorityIds(*peer, authority_ids.clone()),
})
}
}