validator_discovery: cache by (Hash, ParaId) (#2402)

This commit is contained in:
Andronik Ordian
2021-02-09 21:42:46 +01:00
committed by GitHub
parent fab68efac5
commit 89e8e81336
3 changed files with 128 additions and 38 deletions
@@ -279,7 +279,13 @@ async fn distribute_collation(
}
// Issue a discovery request for the validators of the current group and the next group.
connect_to_validators(ctx, relay_parent, state, current_validators.union(&next_validators).cloned().collect()).await?;
connect_to_validators(
ctx,
relay_parent,
id,
state,
current_validators.union(&next_validators).cloned().collect(),
).await?;
state.our_validators_groups.insert(relay_parent, current_validators.into());
@@ -360,6 +366,7 @@ async fn declare(
async fn connect_to_validators(
ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
relay_parent: Hash,
para_id: ParaId,
state: &mut State,
validators: Vec<ValidatorId>,
) -> Result<()> {
@@ -370,7 +377,7 @@ async fn connect_to_validators(
PeerSet::Collation,
).await?;
state.connection_requests.put(relay_parent, request);
state.connection_requests.put(relay_parent, para_id, request);
Ok(())
}
@@ -680,7 +687,7 @@ async fn handle_our_view_change(
for removed in state.view.difference(&view) {
state.collations.remove(removed);
state.our_validators_groups.remove(removed);
state.connection_requests.remove(removed);
state.connection_requests.remove_all(removed);
state.span_per_relay_parent.remove(removed);
}
@@ -193,7 +193,7 @@ async fn handle_signal(
}
for relay_parent in deactivated {
state.connection_requests.remove(&relay_parent);
state.connection_requests.remove_all(&relay_parent);
state.relay_parent_state.remove(&relay_parent);
}
@@ -303,13 +303,14 @@ async fn connect_to_relevant_validators(
relay_parent: Hash,
descriptor: &CandidateDescriptor,
) {
let para_id = descriptor.para_id;
if let Ok(Some(relevant_validators)) =
determine_relevant_validators(ctx, relay_parent, descriptor.para_id).await
determine_relevant_validators(ctx, relay_parent, para_id).await
{
// We only need one connection request per (relay_parent, para_id)
// so here we take this shortcut to avoid calling `connect_to_validators`
// more than once.
if !connection_requests.contains_request(&relay_parent) {
if !connection_requests.contains_request(&relay_parent, para_id) {
tracing::debug!(target: LOG_TARGET, validators=?relevant_validators, "connecting to validators");
match validator_discovery::connect_to_validators(
ctx,
@@ -318,7 +319,7 @@ async fn connect_to_relevant_validators(
PeerSet::Validation,
).await {
Ok(new_connection_request) => {
connection_requests.put(relay_parent, new_connection_request);
connection_requests.put(relay_parent, para_id, new_connection_request);
}
Err(e) => {
tracing::debug!(
@@ -32,7 +32,9 @@ use polkadot_node_subsystem::{
messages::{AllMessages, NetworkBridgeMessage},
SubsystemContext,
};
use polkadot_primitives::v1::{Hash, ValidatorId, AuthorityDiscoveryId, SessionIndex};
use polkadot_primitives::v1::{
Hash, ValidatorId, AuthorityDiscoveryId, SessionIndex, Id as ParaId,
};
use polkadot_node_network_protocol::peer_set::PeerSet;
use sc_network::PeerId;
use crate::Error;
@@ -140,6 +142,8 @@ async fn connect_to_authorities<Context: SubsystemContext>(
pub struct DiscoveredValidator {
/// The relay parent associated with the connection request that returned a result.
pub relay_parent: Hash,
/// The para ID associated with the connection request that returned a result.
pub para_id: ParaId,
/// The [`ValidatorId`] that was resolved.
pub validator_id: ValidatorId,
/// The [`PeerId`] associated to the validator id.
@@ -147,12 +151,13 @@ pub struct DiscoveredValidator {
}
/// Used by [`ConnectionRequests::requests`] to map a [`ConnectionRequest`] item to a [`DiscoveredValidator`].
struct ConnectionRequestForRelayParent {
struct ConnectionRequestForRelayParentAndParaId {
request: ConnectionRequest,
relay_parent: Hash,
para_id: ParaId,
}
impl stream::Stream for ConnectionRequestForRelayParent {
impl stream::Stream for ConnectionRequestForRelayParentAndParaId {
type Item = DiscoveredValidator;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Option<Self::Item>> {
@@ -162,45 +167,63 @@ impl stream::Stream for ConnectionRequestForRelayParent {
validator_id,
peer_id,
relay_parent: self.relay_parent,
para_id: self.para_id,
}))
}
}
/// A struct that assists performing multiple concurrent connection requests.
///
/// This allows concurrent connections to validator sets at different `relay_parents`.
/// This allows concurrent connections to validator sets at different `(relay_parents, para_id)`.
/// Use [`ConnectionRequests::next`] to wait for results of the added connection requests.
#[derive(Default)]
pub struct ConnectionRequests {
/// Connection requests relay_parent -> StreamUnordered token
id_map: HashMap<Hash, usize>,
/// Connection requests relay_parent -> para_id -> StreamUnordered token
///
/// Q: Why not (relay_parent, para_id) -> Stream?
/// A: So that we can remove from it by relay_parent only.
id_map: HashMap<Hash, HashMap<ParaId, usize>>,
/// Connection requests themselves.
requests: StreamUnordered<ConnectionRequestForRelayParent>,
requests: StreamUnordered<ConnectionRequestForRelayParentAndParaId>,
}
impl ConnectionRequests {
/// Insert a new connection request.
///
/// If a `ConnectionRequest` under a given `relay_parent` already exists it will
/// be revoked and substituted with the given one.
pub fn put(&mut self, relay_parent: Hash, request: ConnectionRequest) {
self.remove(&relay_parent);
let token = self.requests.push(ConnectionRequestForRelayParent { relay_parent, request });
/// If a `ConnectionRequest` under a given `relay_parent` and `para_id` already exists,
/// it will be revoked and substituted with the given one.
pub fn put(&mut self, relay_parent: Hash, para_id: ParaId, request: ConnectionRequest) {
self.remove(&relay_parent, para_id);
let token = self.requests.push(ConnectionRequestForRelayParentAndParaId {
relay_parent,
para_id,
request,
});
self.id_map.insert(relay_parent, token);
self.id_map.entry(relay_parent).or_default().insert(para_id, token);
}
/// Remove a connection request by a given `relay_parent`.
pub fn remove(&mut self, relay_parent: &Hash) {
if let Some(token) = self.id_map.remove(relay_parent) {
/// Remove all connection requests by a given `relay_parent`.
pub fn remove_all(&mut self, relay_parent: &Hash) {
let map = self.id_map.remove(relay_parent);
for token in map.map(|m| m.into_iter().map(|(_, v)| v)).into_iter().flatten() {
Pin::new(&mut self.requests).remove(token);
}
}
/// Is a connection at this relay parent already present in the request
pub fn contains_request(&self, relay_parent: &Hash) -> bool {
self.id_map.contains_key(relay_parent)
/// Remove a connection request by a given `relay_parent` and `para_id`.
pub fn remove(&mut self, relay_parent: &Hash, para_id: ParaId) {
if let Some(map) = self.id_map.get_mut(relay_parent) {
if let Some(token) = map.remove(&para_id) {
Pin::new(&mut self.requests).remove(token);
}
}
}
/// Is a connection at this relay parent and para_id already present in the request
pub fn contains_request(&self, relay_parent: &Hash, para_id: ParaId) -> bool {
self.id_map.get(relay_parent).map_or(false, |map| map.contains_key(&para_id))
}
/// Returns the next available connection request result.
@@ -298,8 +321,8 @@ mod tests {
};
let relay_parent_1 = Hash::repeat_byte(1);
connection_requests.put(relay_parent_1.clone(), connection_request_1);
let para_id = ParaId::from(3);
connection_requests.put(relay_parent_1.clone(), para_id, connection_request_1);
rq1_tx.send((auth_1, peer_id_1.clone())).await.unwrap();
rq1_tx.send((auth_2, peer_id_2.clone())).await.unwrap();
@@ -307,13 +330,13 @@ mod tests {
let res = connection_requests.next().await;
assert_eq!(
res,
DiscoveredValidator { relay_parent: relay_parent_1, validator_id: validator_1, peer_id: peer_id_1 },
DiscoveredValidator { relay_parent: relay_parent_1, para_id, validator_id: validator_1, peer_id: peer_id_1 },
);
let res = connection_requests.next().await;
assert_eq!(
res,
DiscoveredValidator { relay_parent: relay_parent_1, validator_id: validator_2, peer_id: peer_id_2 },
DiscoveredValidator { relay_parent: relay_parent_1, para_id, validator_id: validator_2, peer_id: peer_id_2 },
);
check_next_is_pending(&mut connection_requests).await;
@@ -358,9 +381,10 @@ mod tests {
let relay_parent_1 = Hash::repeat_byte(1);
let relay_parent_2 = Hash::repeat_byte(2);
let para_id = ParaId::from(3);
connection_requests.put(relay_parent_1.clone(), connection_request_1);
connection_requests.put(relay_parent_2.clone(), connection_request_2);
connection_requests.put(relay_parent_1.clone(), para_id, connection_request_1);
connection_requests.put(relay_parent_2.clone(), para_id, connection_request_2);
rq1_tx.send((auth_1, peer_id_1.clone())).await.unwrap();
rq2_tx.send((auth_2, peer_id_2.clone())).await.unwrap();
@@ -368,13 +392,70 @@ mod tests {
let res = connection_requests.next().await;
assert_eq!(
res,
DiscoveredValidator { relay_parent: relay_parent_1, validator_id: validator_1, peer_id: peer_id_1 },
DiscoveredValidator { relay_parent: relay_parent_1, para_id, validator_id: validator_1, peer_id: peer_id_1 },
);
let res = connection_requests.next().await;
assert_eq!(
res,
DiscoveredValidator { relay_parent: relay_parent_2, validator_id: validator_2, peer_id: peer_id_2 },
DiscoveredValidator { relay_parent: relay_parent_2, para_id, validator_id: validator_2, peer_id: peer_id_2 },
);
check_next_is_pending(&mut connection_requests).await;
});
}
#[test]
fn same_relay_parent_diffent_para_ids() {
let mut connection_requests = ConnectionRequests::default();
executor::block_on(async move {
check_next_is_pending(&mut connection_requests).await;
let validator_1 = ValidatorPair::generate().0.public();
let validator_2 = ValidatorPair::generate().0.public();
let auth_1 = AuthorityDiscoveryId::from_slice(&[1; 32]);
let auth_2 = AuthorityDiscoveryId::from_slice(&[2; 32]);
let mut validator_map_1 = HashMap::new();
let mut validator_map_2 = HashMap::new();
validator_map_1.insert(auth_1.clone(), validator_1.clone());
validator_map_2.insert(auth_2.clone(), validator_2.clone());
let (mut rq1_tx, rq1_rx) = mpsc::channel(8);
let (mut rq2_tx, rq2_rx) = mpsc::channel(8);
let peer_id_1 = PeerId::random();
let peer_id_2 = PeerId::random();
let connection_request_1 = ConnectionRequest {
validator_map: validator_map_1,
connections: rq1_rx,
};
let connection_request_2 = ConnectionRequest {
validator_map: validator_map_2,
connections: rq2_rx,
};
let relay_parent = Hash::repeat_byte(1);
let para_id_1 = ParaId::from(1);
let para_id_2 = ParaId::from(2);
connection_requests.put(relay_parent.clone(), para_id_1, connection_request_1);
connection_requests.put(relay_parent.clone(), para_id_2, connection_request_2);
rq1_tx.send((auth_1, peer_id_1.clone())).await.unwrap();
rq2_tx.send((auth_2, peer_id_2.clone())).await.unwrap();
connection_requests.remove(&relay_parent, para_id_1);
let res = connection_requests.next().await;
assert_eq!(
res,
DiscoveredValidator { relay_parent, para_id: para_id_2, validator_id: validator_2, peer_id: peer_id_2 },
);
check_next_is_pending(&mut connection_requests).await;
@@ -418,22 +499,23 @@ mod tests {
};
let relay_parent = Hash::repeat_byte(3);
let para_id = ParaId::from(3);
connection_requests.put(relay_parent.clone(), connection_request_1);
connection_requests.put(relay_parent.clone(), para_id, connection_request_1);
rq1_tx.send((auth_1.clone(), peer_id_1.clone())).await.unwrap();
let res = connection_requests.next().await;
assert_eq!(res, DiscoveredValidator { relay_parent, validator_id: validator_1, peer_id: peer_id_1.clone() });
assert_eq!(res, DiscoveredValidator { relay_parent, para_id, validator_id: validator_1, peer_id: peer_id_1.clone() });
connection_requests.put(relay_parent.clone(), connection_request_2);
connection_requests.put(relay_parent.clone(), para_id, connection_request_2);
assert!(rq1_tx.send((auth_1, peer_id_1.clone())).await.is_err());
rq2_tx.send((auth_2, peer_id_2.clone())).await.unwrap();
let res = connection_requests.next().await;
assert_eq!(res, DiscoveredValidator { relay_parent, validator_id: validator_2, peer_id: peer_id_2 });
assert_eq!(res, DiscoveredValidator { relay_parent, para_id, validator_id: validator_2, peer_id: peer_id_2 });
check_next_is_pending(&mut connection_requests).await;
});