collator-protocol: do not connect to the next group (#4261)

* collator-protocol: do not connect to the next group

* fmt
This commit is contained in:
Andronik Ordian
2021-11-11 22:38:41 +01:00
committed by GitHub
parent b0f89bbfbc
commit 760d6804dc
4 changed files with 31 additions and 82 deletions
@@ -379,11 +379,11 @@ where
},
};
// Determine the group on that core and the next group on that core.
let (current_validators, next_validators) =
// Determine the group on that core.
let current_validators =
determine_our_validators(ctx, runtime, our_core, num_cores, relay_parent).await?;
if current_validators.validators.is_empty() && next_validators.validators.is_empty() {
if current_validators.validators.is_empty() {
tracing::warn!(
target: LOG_TARGET,
core = ?our_core,
@@ -401,23 +401,14 @@ where
pov_hash = ?pov.hash(),
core = ?our_core,
?current_validators,
?next_validators,
"Accepted collation, connecting to validators."
);
let validator_group: HashSet<_> =
current_validators.validators.iter().map(Clone::clone).collect();
// Issue a discovery request for the validators of the current group and the next group:
connect_to_validators(
ctx,
current_validators
.validators
.into_iter()
.chain(next_validators.validators.into_iter())
.collect(),
)
.await;
// Issue a discovery request for the validators of the current group:
connect_to_validators(ctx, current_validators.validators.into_iter().collect()).await;
state.our_validators_groups.insert(relay_parent, validator_group.into());
@@ -471,16 +462,16 @@ struct GroupValidators {
validators: Vec<AuthorityDiscoveryId>,
}
/// Figure out current and next group of validators assigned to the para being collated on.
/// Figure out current group of validators assigned to the para being collated on.
///
/// Returns [`ValidatorId`]'s of current and next group as determined based on the `relay_parent`.
/// Returns [`ValidatorId`]'s of current group as determined based on the `relay_parent`.
async fn determine_our_validators<Context>(
ctx: &mut Context,
runtime: &mut RuntimeInfo,
core_index: CoreIndex,
cores: usize,
relay_parent: Hash,
) -> Result<(GroupValidators, GroupValidators)>
) -> Result<GroupValidators>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>,
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
@@ -500,22 +491,15 @@ where
.map(|v| v.as_slice())
.unwrap_or_default();
let next_group_idx = (current_group_index.0 as usize + 1) % groups.len();
let next_validators = groups.get(next_group_idx).map(|v| v.as_slice()).unwrap_or_default();
let validators = &info.discovery_keys;
let current_validators =
current_validators.iter().map(|i| validators[i.0 as usize].clone()).collect();
let next_validators =
next_validators.iter().map(|i| validators[i.0 as usize].clone()).collect();
let current_validators =
GroupValidators { group: current_group_index, validators: current_validators };
let next_validators =
GroupValidators { group: GroupIndex(next_group_idx as u32), validators: next_validators };
Ok((current_validators, next_validators))
Ok(current_validators)
}
/// Issue a `Declare` collation message to the given `peer`.
@@ -332,14 +332,12 @@ impl Default for PeerData {
struct GroupAssignments {
current: Option<ParaId>,
next: Option<ParaId>,
}
#[derive(Default)]
struct ActiveParas {
relay_parent_assignments: HashMap<Hash, GroupAssignments>,
current_assignments: HashMap<ParaId, usize>,
next_assignments: HashMap<ParaId, usize>,
}
impl ActiveParas {
@@ -384,22 +382,16 @@ impl ActiveParas {
},
};
let (para_now, para_next) =
let para_now =
match polkadot_node_subsystem_util::signing_key_and_index(&validators, keystore)
.await
.and_then(|(_, index)| {
polkadot_node_subsystem_util::find_validator_group(&groups, index)
}) {
Some(group) => {
let next_rotation_info = rotation_info.bump_rotation();
let core_now = rotation_info.core_for_group(group, cores.len());
let core_next = next_rotation_info.core_for_group(group, cores.len());
(
cores.get(core_now.0 as usize).and_then(|c| c.para_id()),
cores.get(core_next.0 as usize).and_then(|c| c.para_id()),
)
cores.get(core_now.0 as usize).and_then(|c| c.para_id())
},
None => {
tracing::trace!(target: LOG_TARGET, ?relay_parent, "Not a validator");
@@ -429,19 +421,15 @@ impl ActiveParas {
}
}
if let Some(para_next) = para_next {
*self.next_assignments.entry(para_next).or_default() += 1;
}
self.relay_parent_assignments
.insert(relay_parent, GroupAssignments { current: para_now, next: para_next });
.insert(relay_parent, GroupAssignments { current: para_now });
}
}
fn remove_outgoing(&mut self, old_relay_parents: impl IntoIterator<Item = Hash>) {
for old_relay_parent in old_relay_parents {
if let Some(assignments) = self.relay_parent_assignments.remove(&old_relay_parent) {
let GroupAssignments { current, next } = assignments;
let GroupAssignments { current } = assignments;
if let Some(cur) = current {
if let Entry::Occupied(mut occupied) = self.current_assignments.entry(cur) {
@@ -456,23 +444,10 @@ impl ActiveParas {
}
}
}
if let Some(next) = next {
if let Entry::Occupied(mut occupied) = self.next_assignments.entry(next) {
*occupied.get_mut() -= 1;
if *occupied.get() == 0 {
occupied.remove_entry();
}
}
}
}
}
}
fn is_current_or_next(&self, id: ParaId) -> bool {
self.current_assignments.contains_key(&id) || self.next_assignments.contains_key(&id)
}
fn is_current(&self, id: &ParaId) -> bool {
self.current_assignments.contains_key(id)
}
@@ -846,13 +821,13 @@ async fn process_incoming_peer_message<Context>(
return
}
if state.active_paras.is_current_or_next(para_id) {
if state.active_paras.is_current(&para_id) {
tracing::debug!(
target: LOG_TARGET,
peer_id = ?origin,
?collator_id,
?para_id,
"Declared as collator for current or next para",
"Declared as collator for current para",
);
peer_data.set_collating(collator_id, para_id);
@@ -895,20 +870,6 @@ async fn process_incoming_peer_message<Context>(
Some(p) => p,
};
if let PeerState::Collating(ref collating_state) = peer_data.state {
let para_id = collating_state.para_id;
if !state.active_paras.is_current(&para_id) {
tracing::debug!(
target: LOG_TARGET,
peer_id = ?origin,
%para_id,
?relay_parent,
"Received advertise collation, but we are assigned to the next group",
);
return
}
}
match peer_data.insert_advertisement(relay_parent, &state.view) {
Ok((id, para_id)) => {
tracing::debug!(
@@ -1015,7 +976,7 @@ where
// If the peer hasn't declared yet, they will be disconnected if they do not
// declare.
if let Some(para_id) = peer_data.collating_para() {
if !state.active_paras.is_current_or_next(para_id) {
if !state.active_paras.is_current(&para_id) {
tracing::trace!(target: LOG_TARGET, "Disconnecting peer on view change");
disconnect_peer(ctx, peer_id.clone()).await;
}
@@ -675,7 +675,7 @@ fn fetch_collations_works() {
}
#[test]
fn dont_fetch_collation_if_assigned_to_next_group() {
fn reject_connection_to_next_group() {
let test_state = TestState::default();
test_harness(|test_harness| async move {
@@ -701,15 +701,19 @@ fn dont_fetch_collation_if_assigned_to_next_group() {
)
.await;
advertise_collation(&mut virtual_overseer, peer_b.clone(), test_state.relay_parent).await;
assert!(
overseer_recv_with_timeout(&mut &mut virtual_overseer, Duration::from_millis(30))
.await
.is_none(),
"There should be no PoV fetching request.",
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::NetworkBridge(NetworkBridgeMessage::ReportPeer(
peer,
rep,
)) => {
assert_eq!(peer, peer_b);
assert_eq!(rep, COST_UNNEEDED_COLLATOR);
}
);
assert_collator_disconnect(&mut virtual_overseer, peer_b).await;
virtual_overseer
})
}