diff --git a/polkadot/node/network/collator-protocol/src/collator_side.rs b/polkadot/node/network/collator-protocol/src/collator_side.rs index 7365cda530..7cbb312498 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side.rs @@ -279,7 +279,13 @@ async fn distribute_collation( } // Issue a discovery request for the validators of the current group and the next group. - connect_to_validators(ctx, relay_parent, state, current_validators.union(&next_validators).cloned().collect()).await?; + connect_to_validators( + ctx, + relay_parent, + id, + state, + current_validators.union(&next_validators).cloned().collect(), + ).await?; state.our_validators_groups.insert(relay_parent, current_validators.into()); @@ -360,6 +366,7 @@ async fn declare( async fn connect_to_validators( ctx: &mut impl SubsystemContext, relay_parent: Hash, + para_id: ParaId, state: &mut State, validators: Vec, ) -> Result<()> { @@ -370,7 +377,7 @@ async fn connect_to_validators( PeerSet::Collation, ).await?; - state.connection_requests.put(relay_parent, request); + state.connection_requests.put(relay_parent, para_id, request); Ok(()) } @@ -680,7 +687,7 @@ async fn handle_our_view_change( for removed in state.view.difference(&view) { state.collations.remove(removed); state.our_validators_groups.remove(removed); - state.connection_requests.remove(removed); + state.connection_requests.remove_all(removed); state.span_per_relay_parent.remove(removed); } diff --git a/polkadot/node/network/pov-distribution/src/lib.rs b/polkadot/node/network/pov-distribution/src/lib.rs index d09641407c..3df606983e 100644 --- a/polkadot/node/network/pov-distribution/src/lib.rs +++ b/polkadot/node/network/pov-distribution/src/lib.rs @@ -193,7 +193,7 @@ async fn handle_signal( } for relay_parent in deactivated { - state.connection_requests.remove(&relay_parent); + state.connection_requests.remove_all(&relay_parent); state.relay_parent_state.remove(&relay_parent); } @@ -303,13 +303,14 @@ async fn connect_to_relevant_validators( relay_parent: Hash, descriptor: &CandidateDescriptor, ) { + let para_id = descriptor.para_id; if let Ok(Some(relevant_validators)) = - determine_relevant_validators(ctx, relay_parent, descriptor.para_id).await + determine_relevant_validators(ctx, relay_parent, para_id).await { // We only need one connection request per (relay_parent, para_id) // so here we take this shortcut to avoid calling `connect_to_validators` // more than once. - if !connection_requests.contains_request(&relay_parent) { + if !connection_requests.contains_request(&relay_parent, para_id) { tracing::debug!(target: LOG_TARGET, validators=?relevant_validators, "connecting to validators"); match validator_discovery::connect_to_validators( ctx, @@ -318,7 +319,7 @@ async fn connect_to_relevant_validators( PeerSet::Validation, ).await { Ok(new_connection_request) => { - connection_requests.put(relay_parent, new_connection_request); + connection_requests.put(relay_parent, para_id, new_connection_request); } Err(e) => { tracing::debug!( diff --git a/polkadot/node/subsystem-util/src/validator_discovery.rs b/polkadot/node/subsystem-util/src/validator_discovery.rs index a16af82b82..9219746335 100644 --- a/polkadot/node/subsystem-util/src/validator_discovery.rs +++ b/polkadot/node/subsystem-util/src/validator_discovery.rs @@ -32,7 +32,9 @@ use polkadot_node_subsystem::{ messages::{AllMessages, NetworkBridgeMessage}, SubsystemContext, }; -use polkadot_primitives::v1::{Hash, ValidatorId, AuthorityDiscoveryId, SessionIndex}; +use polkadot_primitives::v1::{ + Hash, ValidatorId, AuthorityDiscoveryId, SessionIndex, Id as ParaId, +}; use polkadot_node_network_protocol::peer_set::PeerSet; use sc_network::PeerId; use crate::Error; @@ -140,6 +142,8 @@ async fn connect_to_authorities( pub struct DiscoveredValidator { /// The relay parent associated with the connection request that returned a result. pub relay_parent: Hash, + /// The para ID associated with the connection request that returned a result. + pub para_id: ParaId, /// The [`ValidatorId`] that was resolved. pub validator_id: ValidatorId, /// The [`PeerId`] associated to the validator id. @@ -147,12 +151,13 @@ pub struct DiscoveredValidator { } /// Used by [`ConnectionRequests::requests`] to map a [`ConnectionRequest`] item to a [`DiscoveredValidator`]. -struct ConnectionRequestForRelayParent { +struct ConnectionRequestForRelayParentAndParaId { request: ConnectionRequest, relay_parent: Hash, + para_id: ParaId, } -impl stream::Stream for ConnectionRequestForRelayParent { +impl stream::Stream for ConnectionRequestForRelayParentAndParaId { type Item = DiscoveredValidator; fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context) -> Poll> { @@ -162,45 +167,63 @@ impl stream::Stream for ConnectionRequestForRelayParent { validator_id, peer_id, relay_parent: self.relay_parent, + para_id: self.para_id, })) } } /// A struct that assists performing multiple concurrent connection requests. /// -/// This allows concurrent connections to validator sets at different `relay_parents`. +/// This allows concurrent connections to validator sets at different `(relay_parents, para_id)`. /// Use [`ConnectionRequests::next`] to wait for results of the added connection requests. #[derive(Default)] pub struct ConnectionRequests { - /// Connection requests relay_parent -> StreamUnordered token - id_map: HashMap, + /// Connection requests relay_parent -> para_id -> StreamUnordered token + /// + /// Q: Why not (relay_parent, para_id) -> Stream? + /// A: So that we can remove from it by relay_parent only. + id_map: HashMap>, /// Connection requests themselves. - requests: StreamUnordered, + requests: StreamUnordered, } impl ConnectionRequests { /// Insert a new connection request. /// - /// If a `ConnectionRequest` under a given `relay_parent` already exists it will - /// be revoked and substituted with the given one. - pub fn put(&mut self, relay_parent: Hash, request: ConnectionRequest) { - self.remove(&relay_parent); - let token = self.requests.push(ConnectionRequestForRelayParent { relay_parent, request }); + /// If a `ConnectionRequest` under a given `relay_parent` and `para_id` already exists, + /// it will be revoked and substituted with the given one. + pub fn put(&mut self, relay_parent: Hash, para_id: ParaId, request: ConnectionRequest) { + self.remove(&relay_parent, para_id); + let token = self.requests.push(ConnectionRequestForRelayParentAndParaId { + relay_parent, + para_id, + request, + }); - self.id_map.insert(relay_parent, token); + self.id_map.entry(relay_parent).or_default().insert(para_id, token); } - /// Remove a connection request by a given `relay_parent`. - pub fn remove(&mut self, relay_parent: &Hash) { - if let Some(token) = self.id_map.remove(relay_parent) { + /// Remove all connection requests by a given `relay_parent`. + pub fn remove_all(&mut self, relay_parent: &Hash) { + let map = self.id_map.remove(relay_parent); + for token in map.map(|m| m.into_iter().map(|(_, v)| v)).into_iter().flatten() { Pin::new(&mut self.requests).remove(token); } } - /// Is a connection at this relay parent already present in the request - pub fn contains_request(&self, relay_parent: &Hash) -> bool { - self.id_map.contains_key(relay_parent) + /// Remove a connection request by a given `relay_parent` and `para_id`. + pub fn remove(&mut self, relay_parent: &Hash, para_id: ParaId) { + if let Some(map) = self.id_map.get_mut(relay_parent) { + if let Some(token) = map.remove(¶_id) { + Pin::new(&mut self.requests).remove(token); + } + } + } + + /// Is a connection at this relay parent and para_id already present in the request + pub fn contains_request(&self, relay_parent: &Hash, para_id: ParaId) -> bool { + self.id_map.get(relay_parent).map_or(false, |map| map.contains_key(¶_id)) } /// Returns the next available connection request result. @@ -298,8 +321,8 @@ mod tests { }; let relay_parent_1 = Hash::repeat_byte(1); - - connection_requests.put(relay_parent_1.clone(), connection_request_1); + let para_id = ParaId::from(3); + connection_requests.put(relay_parent_1.clone(), para_id, connection_request_1); rq1_tx.send((auth_1, peer_id_1.clone())).await.unwrap(); rq1_tx.send((auth_2, peer_id_2.clone())).await.unwrap(); @@ -307,13 +330,13 @@ mod tests { let res = connection_requests.next().await; assert_eq!( res, - DiscoveredValidator { relay_parent: relay_parent_1, validator_id: validator_1, peer_id: peer_id_1 }, + DiscoveredValidator { relay_parent: relay_parent_1, para_id, validator_id: validator_1, peer_id: peer_id_1 }, ); let res = connection_requests.next().await; assert_eq!( res, - DiscoveredValidator { relay_parent: relay_parent_1, validator_id: validator_2, peer_id: peer_id_2 }, + DiscoveredValidator { relay_parent: relay_parent_1, para_id, validator_id: validator_2, peer_id: peer_id_2 }, ); check_next_is_pending(&mut connection_requests).await; @@ -358,9 +381,10 @@ mod tests { let relay_parent_1 = Hash::repeat_byte(1); let relay_parent_2 = Hash::repeat_byte(2); + let para_id = ParaId::from(3); - connection_requests.put(relay_parent_1.clone(), connection_request_1); - connection_requests.put(relay_parent_2.clone(), connection_request_2); + connection_requests.put(relay_parent_1.clone(), para_id, connection_request_1); + connection_requests.put(relay_parent_2.clone(), para_id, connection_request_2); rq1_tx.send((auth_1, peer_id_1.clone())).await.unwrap(); rq2_tx.send((auth_2, peer_id_2.clone())).await.unwrap(); @@ -368,13 +392,70 @@ mod tests { let res = connection_requests.next().await; assert_eq!( res, - DiscoveredValidator { relay_parent: relay_parent_1, validator_id: validator_1, peer_id: peer_id_1 }, + DiscoveredValidator { relay_parent: relay_parent_1, para_id, validator_id: validator_1, peer_id: peer_id_1 }, ); let res = connection_requests.next().await; assert_eq!( res, - DiscoveredValidator { relay_parent: relay_parent_2, validator_id: validator_2, peer_id: peer_id_2 }, + DiscoveredValidator { relay_parent: relay_parent_2, para_id, validator_id: validator_2, peer_id: peer_id_2 }, + ); + + check_next_is_pending(&mut connection_requests).await; + }); + } + + #[test] + fn same_relay_parent_diffent_para_ids() { + let mut connection_requests = ConnectionRequests::default(); + + executor::block_on(async move { + check_next_is_pending(&mut connection_requests).await; + + let validator_1 = ValidatorPair::generate().0.public(); + let validator_2 = ValidatorPair::generate().0.public(); + + let auth_1 = AuthorityDiscoveryId::from_slice(&[1; 32]); + let auth_2 = AuthorityDiscoveryId::from_slice(&[2; 32]); + + let mut validator_map_1 = HashMap::new(); + let mut validator_map_2 = HashMap::new(); + + validator_map_1.insert(auth_1.clone(), validator_1.clone()); + validator_map_2.insert(auth_2.clone(), validator_2.clone()); + + let (mut rq1_tx, rq1_rx) = mpsc::channel(8); + let (mut rq2_tx, rq2_rx) = mpsc::channel(8); + + let peer_id_1 = PeerId::random(); + let peer_id_2 = PeerId::random(); + + let connection_request_1 = ConnectionRequest { + validator_map: validator_map_1, + connections: rq1_rx, + }; + + let connection_request_2 = ConnectionRequest { + validator_map: validator_map_2, + connections: rq2_rx, + }; + + let relay_parent = Hash::repeat_byte(1); + let para_id_1 = ParaId::from(1); + let para_id_2 = ParaId::from(2); + + connection_requests.put(relay_parent.clone(), para_id_1, connection_request_1); + connection_requests.put(relay_parent.clone(), para_id_2, connection_request_2); + + rq1_tx.send((auth_1, peer_id_1.clone())).await.unwrap(); + rq2_tx.send((auth_2, peer_id_2.clone())).await.unwrap(); + + connection_requests.remove(&relay_parent, para_id_1); + + let res = connection_requests.next().await; + assert_eq!( + res, + DiscoveredValidator { relay_parent, para_id: para_id_2, validator_id: validator_2, peer_id: peer_id_2 }, ); check_next_is_pending(&mut connection_requests).await; @@ -418,22 +499,23 @@ mod tests { }; let relay_parent = Hash::repeat_byte(3); + let para_id = ParaId::from(3); - connection_requests.put(relay_parent.clone(), connection_request_1); + connection_requests.put(relay_parent.clone(), para_id, connection_request_1); rq1_tx.send((auth_1.clone(), peer_id_1.clone())).await.unwrap(); let res = connection_requests.next().await; - assert_eq!(res, DiscoveredValidator { relay_parent, validator_id: validator_1, peer_id: peer_id_1.clone() }); + assert_eq!(res, DiscoveredValidator { relay_parent, para_id, validator_id: validator_1, peer_id: peer_id_1.clone() }); - connection_requests.put(relay_parent.clone(), connection_request_2); + connection_requests.put(relay_parent.clone(), para_id, connection_request_2); assert!(rq1_tx.send((auth_1, peer_id_1.clone())).await.is_err()); rq2_tx.send((auth_2, peer_id_2.clone())).await.unwrap(); let res = connection_requests.next().await; - assert_eq!(res, DiscoveredValidator { relay_parent, validator_id: validator_2, peer_id: peer_id_2 }); + assert_eq!(res, DiscoveredValidator { relay_parent, para_id, validator_id: validator_2, peer_id: peer_id_2 }); check_next_is_pending(&mut connection_requests).await; });