mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-26 13:27:57 +00:00
Fixes bug that collator wasn't sending Declare message (#1895)
* Fixes bug that collator wasn't sending `Declare` message * Apply suggestions from code review I'm an idiot
This commit is contained in:
@@ -563,11 +563,15 @@ async fn handle_validator_connected<Context>(
|
||||
ctx: &mut Context,
|
||||
state: &mut State,
|
||||
peer_id: PeerId,
|
||||
validator_id: ValidatorId,
|
||||
) -> Result<()>
|
||||
where
|
||||
Context: SubsystemContext<Message = CollatorProtocolMessage>
|
||||
{
|
||||
if !state.peer_views.contains_key(&peer_id) {
|
||||
// Check if the validator is already known or if maybe its peer id chaned(should not happen)
|
||||
let unknown = state.known_validators.insert(validator_id, peer_id.clone()).map(|o| o != peer_id).unwrap_or(true);
|
||||
|
||||
if unknown {
|
||||
// Only declare the new peers.
|
||||
declare(ctx, state, vec![peer_id.clone()]).await?;
|
||||
state.peer_views.insert(peer_id, Default::default());
|
||||
@@ -651,8 +655,7 @@ where
|
||||
loop {
|
||||
if let Some(mut request) = state.last_connection_request.take() {
|
||||
while let Poll::Ready(Some((validator_id, peer_id))) = futures::poll!(request.next()) {
|
||||
state.known_validators.insert(validator_id, peer_id.clone());
|
||||
if let Err(err) = handle_validator_connected(&mut ctx, &mut state, peer_id).await {
|
||||
if let Err(err) = handle_validator_connected(&mut ctx, &mut state, peer_id, validator_id).await {
|
||||
warn!(
|
||||
target: TARGET,
|
||||
"Failed to declare our collator id: {:?}",
|
||||
@@ -698,6 +701,7 @@ mod tests {
|
||||
use polkadot_subsystem::ActiveLeavesUpdate;
|
||||
use polkadot_node_subsystem_util::TimeoutExt;
|
||||
use polkadot_subsystem_testhelpers as test_helpers;
|
||||
use polkadot_node_network_protocol::ObservedRole;
|
||||
|
||||
#[derive(Default)]
|
||||
struct TestCandidateBuilder {
|
||||
@@ -889,9 +893,7 @@ mod tests {
|
||||
|
||||
test_harness(test_state.our_collator_pair.public(), |test_harness| async move {
|
||||
let current = test_state.relay_parent;
|
||||
let TestHarness {
|
||||
mut virtual_overseer,
|
||||
} = test_harness;
|
||||
let mut virtual_overseer = test_harness.virtual_overseer;
|
||||
|
||||
let pov_block = PoV {
|
||||
block_data: BlockData(vec![42, 43, 44]),
|
||||
@@ -976,10 +978,7 @@ mod tests {
|
||||
)) => {
|
||||
assert_eq!(relay_parent, current);
|
||||
assert_eq!(validators.len(), 4);
|
||||
assert!(validators.contains(&test_state.validator_public[2]));
|
||||
assert!(validators.contains(&test_state.validator_public[0]));
|
||||
assert!(validators.contains(&test_state.validator_public[4]));
|
||||
assert!(validators.contains(&test_state.validator_public[1]));
|
||||
assert!(validators.iter().all(|v| test_state.validator_public.contains(&v)));
|
||||
|
||||
let result = vec![
|
||||
Some(test_state.validator_authority_id[2].clone()),
|
||||
@@ -1002,10 +1001,7 @@ mod tests {
|
||||
}
|
||||
) => {
|
||||
assert_eq!(validator_ids.len(), 4);
|
||||
assert!(validator_ids.contains(&test_state.validator_authority_id[2]));
|
||||
assert!(validator_ids.contains(&test_state.validator_authority_id[0]));
|
||||
assert!(validator_ids.contains(&test_state.validator_authority_id[4]));
|
||||
assert!(validator_ids.contains(&test_state.validator_authority_id[1]));
|
||||
assert!(validator_ids.iter().all(|id| test_state.validator_authority_id.contains(id)));
|
||||
|
||||
let result = vec![
|
||||
(test_state.validator_authority_id[2].clone(), test_state.validator_peer_id[2].clone()),
|
||||
@@ -1014,9 +1010,7 @@ mod tests {
|
||||
(test_state.validator_authority_id[1].clone(), test_state.validator_peer_id[1].clone()),
|
||||
];
|
||||
|
||||
for (id, peer_id) in result.into_iter() {
|
||||
connected.try_send((id, peer_id)).unwrap();
|
||||
}
|
||||
result.into_iter().for_each(|r| connected.try_send(r).unwrap());
|
||||
}
|
||||
);
|
||||
|
||||
@@ -1251,10 +1245,7 @@ mod tests {
|
||||
)) => {
|
||||
assert_eq!(relay_parent, current);
|
||||
assert_eq!(validators.len(), 4);
|
||||
assert!(validators.contains(&test_state.validator_public[2]));
|
||||
assert!(validators.contains(&test_state.validator_public[0]));
|
||||
assert!(validators.contains(&test_state.validator_public[4]));
|
||||
assert!(validators.contains(&test_state.validator_public[1]));
|
||||
assert!(validators.iter().all(|p| test_state.validator_public.contains(p)));
|
||||
|
||||
let result = vec![
|
||||
Some(test_state.validator_authority_id[2].clone()),
|
||||
@@ -1267,4 +1258,161 @@ mod tests {
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
/// This test ensures that we declare a collator at a validator by sending the `Declare` message as soon as the
|
||||
/// collator is aware of the validator being connected.
|
||||
#[test]
|
||||
fn collators_are_registered_correctly_at_validators() {
|
||||
let test_state = TestState::default();
|
||||
|
||||
test_harness(test_state.our_collator_pair.public(), |test_harness| async move {
|
||||
let mut virtual_overseer = test_harness.virtual_overseer;
|
||||
|
||||
let peer = test_state.validator_peer_id[0].clone();
|
||||
let validator_id = test_state.validator_authority_id[0].clone();
|
||||
|
||||
// Setup the system correctly
|
||||
overseer_send(
|
||||
&mut virtual_overseer,
|
||||
CollatorProtocolMessage::CollateOn(test_state.chain_ids[0]),
|
||||
).await;
|
||||
|
||||
overseer_signal(
|
||||
&mut virtual_overseer,
|
||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
||||
activated: smallvec![test_state.relay_parent],
|
||||
deactivated: smallvec![],
|
||||
}),
|
||||
).await;
|
||||
|
||||
overseer_send(
|
||||
&mut virtual_overseer,
|
||||
CollatorProtocolMessage::NetworkBridgeUpdateV1(
|
||||
NetworkBridgeEvent::OurViewChange(View(vec![test_state.relay_parent])),
|
||||
),
|
||||
).await;
|
||||
|
||||
// A validator connected to us
|
||||
overseer_send(
|
||||
&mut virtual_overseer,
|
||||
CollatorProtocolMessage::NetworkBridgeUpdateV1(
|
||||
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Authority),
|
||||
),
|
||||
).await;
|
||||
|
||||
overseer_send(
|
||||
&mut virtual_overseer,
|
||||
CollatorProtocolMessage::NetworkBridgeUpdateV1(
|
||||
NetworkBridgeEvent::PeerViewChange(peer.clone(), View(Default::default())),
|
||||
),
|
||||
).await;
|
||||
|
||||
// Now we want to distribute a PoVBlock
|
||||
let pov_block = PoV {
|
||||
block_data: BlockData(vec![42, 43, 44]),
|
||||
};
|
||||
|
||||
let pov_hash = pov_block.hash();
|
||||
|
||||
let candidate = TestCandidateBuilder {
|
||||
para_id: test_state.chain_ids[0],
|
||||
relay_parent: test_state.relay_parent,
|
||||
pov_hash,
|
||||
..Default::default()
|
||||
}.build();
|
||||
|
||||
overseer_send(
|
||||
&mut virtual_overseer,
|
||||
CollatorProtocolMessage::DistributeCollation(candidate.clone(), pov_block.clone()),
|
||||
).await;
|
||||
|
||||
// obtain the availability cores.
|
||||
assert_matches!(
|
||||
overseer_recv(&mut virtual_overseer).await,
|
||||
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
relay_parent,
|
||||
RuntimeApiRequest::AvailabilityCores(tx)
|
||||
)) => {
|
||||
assert_eq!(relay_parent, test_state.relay_parent);
|
||||
tx.send(Ok(test_state.availability_cores.clone())).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
// Obtain the validator groups
|
||||
assert_matches!(
|
||||
overseer_recv(&mut virtual_overseer).await,
|
||||
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
relay_parent,
|
||||
RuntimeApiRequest::ValidatorGroups(tx)
|
||||
)) => {
|
||||
assert_eq!(relay_parent, test_state.relay_parent);
|
||||
tx.send(Ok(test_state.validator_groups.clone())).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
// obtain the validators per relay parent
|
||||
assert_matches!(
|
||||
overseer_recv(&mut virtual_overseer).await,
|
||||
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
relay_parent,
|
||||
RuntimeApiRequest::Validators(tx),
|
||||
)) => {
|
||||
assert_eq!(relay_parent, test_state.relay_parent);
|
||||
tx.send(Ok(test_state.validator_public.clone())).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
// obtain the validator_id to authority_id mapping
|
||||
assert_matches!(
|
||||
overseer_recv(&mut virtual_overseer).await,
|
||||
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
relay_parent,
|
||||
RuntimeApiRequest::ValidatorDiscovery(validators, tx),
|
||||
)) => {
|
||||
assert_eq!(relay_parent, test_state.relay_parent);
|
||||
assert_eq!(validators.len(), 4);
|
||||
assert!(validators.iter().all(|v| test_state.validator_public.contains(&v)));
|
||||
|
||||
let result = vec![
|
||||
Some(test_state.validator_authority_id[2].clone()),
|
||||
Some(test_state.validator_authority_id[0].clone()),
|
||||
Some(test_state.validator_authority_id[4].clone()),
|
||||
Some(test_state.validator_authority_id[1].clone()),
|
||||
];
|
||||
tx.send(Ok(result)).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
assert_matches!(
|
||||
overseer_recv(&mut virtual_overseer).await,
|
||||
AllMessages::NetworkBridge(
|
||||
NetworkBridgeMessage::ConnectToValidators {
|
||||
mut connected,
|
||||
..
|
||||
}
|
||||
) => {
|
||||
connected.try_send((validator_id, peer.clone())).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
assert_matches!(
|
||||
overseer_recv(&mut virtual_overseer).await,
|
||||
AllMessages::NetworkBridge(
|
||||
NetworkBridgeMessage::SendCollationMessage(
|
||||
peer_id,
|
||||
msg,
|
||||
)
|
||||
) => {
|
||||
assert_matches!(
|
||||
msg,
|
||||
protocol_v1::CollationProtocol::CollatorProtocol(
|
||||
protocol_v1::CollatorProtocolMessage::Declare(collator_id),
|
||||
) if collator_id == test_state.our_collator_pair.public()
|
||||
);
|
||||
|
||||
assert_eq!(peer, peer_id[0]);
|
||||
}
|
||||
);
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user