diff --git a/polkadot/node/core/candidate-selection/src/lib.rs b/polkadot/node/core/candidate-selection/src/lib.rs index 4399a90f59..2a8aea80b0 100644 --- a/polkadot/node/core/candidate-selection/src/lib.rs +++ b/polkadot/node/core/candidate-selection/src/lib.rs @@ -187,8 +187,7 @@ impl CandidateSelectionJob { para_id, collator_id, )) => { - self.handle_collation(relay_parent, para_id, collator_id) - .await; + self.handle_collation(relay_parent, para_id, collator_id).await; } ToJob::CandidateSelection(CandidateSelectionMessage::Invalid( _, diff --git a/polkadot/node/network/collator-protocol/src/collator_side.rs b/polkadot/node/network/collator-protocol/src/collator_side.rs index 3cf7808e36..da517902fd 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side.rs @@ -563,11 +563,15 @@ async fn handle_validator_connected( ctx: &mut Context, state: &mut State, peer_id: PeerId, + validator_id: ValidatorId, ) -> Result<()> where Context: SubsystemContext { - if !state.peer_views.contains_key(&peer_id) { + // Check if the validator is already known or if maybe its peer id chaned(should not happen) + let unknown = state.known_validators.insert(validator_id, peer_id.clone()).map(|o| o != peer_id).unwrap_or(true); + + if unknown { // Only declare the new peers. declare(ctx, state, vec![peer_id.clone()]).await?; state.peer_views.insert(peer_id, Default::default()); @@ -651,8 +655,7 @@ where loop { if let Some(mut request) = state.last_connection_request.take() { while let Poll::Ready(Some((validator_id, peer_id))) = futures::poll!(request.next()) { - state.known_validators.insert(validator_id, peer_id.clone()); - if let Err(err) = handle_validator_connected(&mut ctx, &mut state, peer_id).await { + if let Err(err) = handle_validator_connected(&mut ctx, &mut state, peer_id, validator_id).await { warn!( target: TARGET, "Failed to declare our collator id: {:?}", @@ -698,6 +701,7 @@ mod tests { use polkadot_subsystem::ActiveLeavesUpdate; use polkadot_node_subsystem_util::TimeoutExt; use polkadot_subsystem_testhelpers as test_helpers; + use polkadot_node_network_protocol::ObservedRole; #[derive(Default)] struct TestCandidateBuilder { @@ -889,9 +893,7 @@ mod tests { test_harness(test_state.our_collator_pair.public(), |test_harness| async move { let current = test_state.relay_parent; - let TestHarness { - mut virtual_overseer, - } = test_harness; + let mut virtual_overseer = test_harness.virtual_overseer; let pov_block = PoV { block_data: BlockData(vec![42, 43, 44]), @@ -976,10 +978,7 @@ mod tests { )) => { assert_eq!(relay_parent, current); assert_eq!(validators.len(), 4); - assert!(validators.contains(&test_state.validator_public[2])); - assert!(validators.contains(&test_state.validator_public[0])); - assert!(validators.contains(&test_state.validator_public[4])); - assert!(validators.contains(&test_state.validator_public[1])); + assert!(validators.iter().all(|v| test_state.validator_public.contains(&v))); let result = vec![ Some(test_state.validator_authority_id[2].clone()), @@ -1002,10 +1001,7 @@ mod tests { } ) => { assert_eq!(validator_ids.len(), 4); - assert!(validator_ids.contains(&test_state.validator_authority_id[2])); - assert!(validator_ids.contains(&test_state.validator_authority_id[0])); - assert!(validator_ids.contains(&test_state.validator_authority_id[4])); - assert!(validator_ids.contains(&test_state.validator_authority_id[1])); + assert!(validator_ids.iter().all(|id| test_state.validator_authority_id.contains(id))); let result = vec![ (test_state.validator_authority_id[2].clone(), test_state.validator_peer_id[2].clone()), @@ -1014,9 +1010,7 @@ mod tests { (test_state.validator_authority_id[1].clone(), test_state.validator_peer_id[1].clone()), ]; - for (id, peer_id) in result.into_iter() { - connected.try_send((id, peer_id)).unwrap(); - } + result.into_iter().for_each(|r| connected.try_send(r).unwrap()); } ); @@ -1251,10 +1245,7 @@ mod tests { )) => { assert_eq!(relay_parent, current); assert_eq!(validators.len(), 4); - assert!(validators.contains(&test_state.validator_public[2])); - assert!(validators.contains(&test_state.validator_public[0])); - assert!(validators.contains(&test_state.validator_public[4])); - assert!(validators.contains(&test_state.validator_public[1])); + assert!(validators.iter().all(|p| test_state.validator_public.contains(p))); let result = vec![ Some(test_state.validator_authority_id[2].clone()), @@ -1267,4 +1258,161 @@ mod tests { ); }); } + + /// This test ensures that we declare a collator at a validator by sending the `Declare` message as soon as the + /// collator is aware of the validator being connected. + #[test] + fn collators_are_registered_correctly_at_validators() { + let test_state = TestState::default(); + + test_harness(test_state.our_collator_pair.public(), |test_harness| async move { + let mut virtual_overseer = test_harness.virtual_overseer; + + let peer = test_state.validator_peer_id[0].clone(); + let validator_id = test_state.validator_authority_id[0].clone(); + + // Setup the system correctly + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::CollateOn(test_state.chain_ids[0]), + ).await; + + overseer_signal( + &mut virtual_overseer, + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { + activated: smallvec![test_state.relay_parent], + deactivated: smallvec![], + }), + ).await; + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::OurViewChange(View(vec![test_state.relay_parent])), + ), + ).await; + + // A validator connected to us + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Authority), + ), + ).await; + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerViewChange(peer.clone(), View(Default::default())), + ), + ).await; + + // Now we want to distribute a PoVBlock + let pov_block = PoV { + block_data: BlockData(vec![42, 43, 44]), + }; + + let pov_hash = pov_block.hash(); + + let candidate = TestCandidateBuilder { + para_id: test_state.chain_ids[0], + relay_parent: test_state.relay_parent, + pov_hash, + ..Default::default() + }.build(); + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::DistributeCollation(candidate.clone(), pov_block.clone()), + ).await; + + // obtain the availability cores. + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::AvailabilityCores(tx) + )) => { + assert_eq!(relay_parent, test_state.relay_parent); + tx.send(Ok(test_state.availability_cores.clone())).unwrap(); + } + ); + + // Obtain the validator groups + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::ValidatorGroups(tx) + )) => { + assert_eq!(relay_parent, test_state.relay_parent); + tx.send(Ok(test_state.validator_groups.clone())).unwrap(); + } + ); + + // obtain the validators per relay parent + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Validators(tx), + )) => { + assert_eq!(relay_parent, test_state.relay_parent); + tx.send(Ok(test_state.validator_public.clone())).unwrap(); + } + ); + + // obtain the validator_id to authority_id mapping + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::ValidatorDiscovery(validators, tx), + )) => { + assert_eq!(relay_parent, test_state.relay_parent); + assert_eq!(validators.len(), 4); + assert!(validators.iter().all(|v| test_state.validator_public.contains(&v))); + + let result = vec![ + Some(test_state.validator_authority_id[2].clone()), + Some(test_state.validator_authority_id[0].clone()), + Some(test_state.validator_authority_id[4].clone()), + Some(test_state.validator_authority_id[1].clone()), + ]; + tx.send(Ok(result)).unwrap(); + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::ConnectToValidators { + mut connected, + .. + } + ) => { + connected.try_send((validator_id, peer.clone())).unwrap(); + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::SendCollationMessage( + peer_id, + msg, + ) + ) => { + assert_matches!( + msg, + protocol_v1::CollationProtocol::CollatorProtocol( + protocol_v1::CollatorProtocolMessage::Declare(collator_id), + ) if collator_id == test_state.our_collator_pair.public() + ); + + assert_eq!(peer, peer_id[0]); + } + ); + }) + } }