diff --git a/polkadot/node/network/collator-protocol/src/collator_side.rs b/polkadot/node/network/collator-protocol/src/collator_side.rs index 5a2d48003a..784a246b8b 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side.rs @@ -14,15 +14,14 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use super::{LOG_TARGET, Result}; -use futures::{StreamExt, task::Poll}; +use futures::{StreamExt, select, FutureExt}; use polkadot_primitives::v1::{ - CollatorId, CoreIndex, CoreState, Hash, Id as ParaId, CandidateReceipt, - PoV, ValidatorId, + CollatorId, CoreIndex, CoreState, Hash, Id as ParaId, CandidateReceipt, PoV, ValidatorId, }; use polkadot_subsystem::{ FromOverseer, OverseerSignal, SubsystemContext, @@ -120,6 +119,60 @@ impl metrics::Metrics for Metrics { } } +/// The group of validators that is assigned to our para at a given point of time. +/// +/// This structure is responsible for keeping track of which validators belong to a certain group for a para. It also +/// stores a mapping from [`PeerId`] to [`ValidatorId`] as we learn about it over the lifetime of this object. Besides +/// that it also keeps track to which validators we advertised our collation. +struct ValidatorGroup { + /// All [`ValidatorId`]'s that are assigned to us in this group. + validator_ids: HashSet, + /// The mapping from [`PeerId`] to [`ValidatorId`]. This is filled over time as we learn the [`PeerId`]'s from the + /// authority discovery. It is not ensured that this will contain *all* validators of this group. + peer_ids: HashMap, + /// All [`ValidatorId`]'s of the current group to that we advertised our collation. + advertised_to: HashSet, +} + +impl ValidatorGroup { + /// Returns `true` if we should advertise our collation to the given peer. + fn should_advertise_to(&self, peer: &PeerId) -> bool { + match self.peer_ids.get(peer) { + Some(validator_id) => !self.advertised_to.contains(validator_id), + None => false, + } + } + + /// Should be called after we advertised our collation to the given `peer` to keep track of it. + fn advertised_to_peer(&mut self, peer: &PeerId) { + if let Some(validator_id) = self.peer_ids.get(peer) { + self.advertised_to.insert(validator_id.clone()); + } + } + + /// Add a [`PeerId`] that belongs to the given [`ValidatorId`]. + /// + /// This returns `true` if the given validator belongs to this group and we could insert its [`PeerId`]. + fn add_peer_id_for_validator(&mut self, peer_id: &PeerId, validator_id: &ValidatorId) -> bool { + if !self.validator_ids.contains(validator_id) { + false + } else { + self.peer_ids.insert(peer_id.clone(), validator_id.clone()); + true + } + } +} + +impl From> for ValidatorGroup { + fn from(validator_ids: HashSet) -> Self { + Self { + validator_ids, + peer_ids: HashMap::new(), + advertised_to: HashSet::new(), + } + } +} + #[derive(Default)] struct State { /// Our id. @@ -141,24 +194,26 @@ struct State { /// We will keep up to one local collation per relay-parent. collations: HashMap, - /// Our validator groups active leafs. - our_validators_groups: HashMap>, + /// Our validator groups per active leaf. + our_validators_groups: HashMap, - /// Validators we know about via `ConnectToValidators` message. - /// - /// These are the only validators we are interested in talking to and as such - /// all actions from peers not in this map will be ignored. - /// Entries in this map will be cleared as validator groups in `our_validator_groups` - /// go out of scope with their respective deactivated leafs. - known_validators: HashMap, + /// List of peers where we declared ourself as a collator. + declared_at: HashSet, - /// Use to await for the next validator connection and revoke the request. - last_connection_request: Option, + /// The connection requests to validators per relay parent. + connection_requests: validator_discovery::ConnectionRequests, /// Metrics. metrics: Metrics, } +impl State { + /// Returns `true` if the given `peer` is interested in the leaf that is represented by `relay_parent`. + fn peer_interested_in_leaf(&self, peer: &PeerId, relay_parent: &Hash) -> bool { + self.peer_views.get(peer).map(|v| v.contains(relay_parent)).unwrap_or(false) + } +} + /// Distribute a collation. /// /// Figure out the core our para is assigned to and the relevant validators. @@ -168,16 +223,13 @@ struct State { /// as it must be invalid in that case - although this indicates a logic error /// elsewhere in the node. #[tracing::instrument(level = "trace", skip(ctx, state, pov), fields(subsystem = LOG_TARGET))] -async fn distribute_collation( - ctx: &mut Context, +async fn distribute_collation( + ctx: &mut impl SubsystemContext, state: &mut State, id: ParaId, receipt: CandidateReceipt, pov: PoV, -) -> Result<()> -where - Context: SubsystemContext -{ +) -> Result<()> { let relay_parent = receipt.descriptor.relay_parent; // This collation is not in the active-leaves set. @@ -207,41 +259,28 @@ where relay_parent = %relay_parent, "looks like no core is assigned to {} at {}", id, relay_parent, ); - return Ok(()); + + return Ok(()) } }; // Determine the group on that core and the next group on that core. - let our_validators = match determine_our_validators(ctx, our_core, num_cores, relay_parent).await? { - Some(validators) => validators, - None => { - tracing::warn!( - target: LOG_TARGET, - core = ?our_core, - "there are no validators assigned to core", - ); + let (current_validators, next_validators) = determine_our_validators(ctx, our_core, num_cores, relay_parent).await?; - return Ok(()); - } - }; + if current_validators.is_empty() && next_validators.is_empty() { + tracing::warn!( + target: LOG_TARGET, + core = ?our_core, + "there are no validators assigned to core", + ); - state.our_validators_groups.insert(relay_parent, our_validators.clone()); - - // We may be already connected to some of the validators. In that case, - // advertise a collation to them right away. - for validator in our_validators.iter() { - if let Some(peer) = state.known_validators.get(&validator) { - if let Some(view) = state.peer_views.get(peer) { - if view.contains(&relay_parent) { - let peer = peer.clone(); - advertise_collation(ctx, state, relay_parent, vec![peer]).await?; - } - } - } + return Ok(()) } // Issue a discovery request for the validators of the current group and the next group. - connect_to_validators(ctx, relay_parent, state, our_validators).await?; + connect_to_validators(ctx, relay_parent, state, current_validators.union(&next_validators).cloned().collect()).await?; + + state.our_validators_groups.insert(relay_parent, current_validators.into()); state.collations.insert(relay_parent, (receipt, pov)); @@ -251,14 +290,11 @@ where /// Get the Id of the Core that is assigned to the para being collated on if any /// and the total number of cores. #[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))] -async fn determine_core( - ctx: &mut Context, +async fn determine_core( + ctx: &mut impl SubsystemContext, para_id: ParaId, relay_parent: Hash, -) -> Result> -where - Context: SubsystemContext -{ +) -> Result> { let cores = request_availability_cores_ctx(relay_parent, ctx).await?.await??; for (idx, core) in cores.iter().enumerate() { @@ -272,63 +308,46 @@ where Ok(None) } -/// Figure out a group of validators assigned to the para being collated on. +/// Figure out current and next group of validators assigned to the para being collated on. /// -/// This returns validators for the current group and the next group. +/// Returns [`ValidatorId`]'s of current and next group as determined based on the `relay_parent`. #[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))] -async fn determine_our_validators( - ctx: &mut Context, +async fn determine_our_validators( + ctx: &mut impl SubsystemContext, core_index: CoreIndex, cores: usize, relay_parent: Hash, -) -> Result>> -where - Context: SubsystemContext -{ +) -> Result<(HashSet, HashSet)> { let groups = request_validator_groups_ctx(relay_parent, ctx).await?; let groups = groups.await??; let current_group_index = groups.1.group_for_core(core_index, cores); - - let mut connect_to_validators = match groups.0.get(current_group_index.0 as usize) { - Some(group) => group.clone(), - None => return Ok(None), - }; + let current_validators = groups.0.get(current_group_index.0 as usize).map(|v| v.as_slice()).unwrap_or_default(); let next_group_idx = (current_group_index.0 as usize + 1) % groups.0.len(); + let next_validators = groups.0.get(next_group_idx).map(|v| v.as_slice()).unwrap_or_default(); - if let Some(next_group) = groups.0.get(next_group_idx) { - connect_to_validators.extend_from_slice(&next_group); - } + let validators = request_validators_ctx(relay_parent, ctx).await?.await??; - let validators = request_validators_ctx(relay_parent, ctx).await?; + let current_validators = current_validators.iter().map(|i| validators[*i as usize].clone()).collect(); + let next_validators = next_validators.iter().map(|i| validators[*i as usize].clone()).collect(); - let validators = validators.await??; - - let validators = connect_to_validators - .into_iter() - .map(|idx| validators[idx as usize].clone()) - .collect(); - - Ok(Some(validators)) + Ok((current_validators, next_validators)) } -/// Issue a `Declare` collation message to a set of peers. +/// Issue a `Declare` collation message to the given `peer`. #[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))] -async fn declare( - ctx: &mut Context, +async fn declare( + ctx: &mut impl SubsystemContext, state: &mut State, - to: Vec, -) -> Result<()> -where - Context: SubsystemContext -{ + peer: PeerId, +) -> Result<()> { let wire_message = protocol_v1::CollatorProtocolMessage::Declare(state.our_id.clone()); ctx.send_message(AllMessages::NetworkBridge( NetworkBridgeMessage::SendCollationMessage( - to, + vec![peer], protocol_v1::CollationProtocol::CollatorProtocol(wire_message), ) )).await?; @@ -339,41 +358,34 @@ where /// Issue a connection request to a set of validators and /// revoke the previous connection request. #[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))] -async fn connect_to_validators( - ctx: &mut Context, +async fn connect_to_validators( + ctx: &mut impl SubsystemContext, relay_parent: Hash, state: &mut State, validators: Vec, -) -> Result<()> -where - Context: SubsystemContext -{ - if let Some(request) = state.last_connection_request.take() { - drop(request); - } - +) -> Result<()> { let request = validator_discovery::connect_to_validators( ctx, relay_parent, validators, ).await?; - state.last_connection_request = Some(request); + state.connection_requests.put(relay_parent, request); Ok(()) } -/// Advertise collation to a set of relay chain validators. +/// Advertise collation to the given `peer`. +/// +/// This will only advertise a collation if there exists one for the given `relay_parent` and the given `peer` is +/// set as validator for our para at the given `relay_parent`. #[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))] -async fn advertise_collation( - ctx: &mut Context, +async fn advertise_collation( + ctx: &mut impl SubsystemContext, state: &mut State, relay_parent: Hash, - to: Vec, -) -> Result<()> -where - Context: SubsystemContext -{ + peer: PeerId, +) -> Result<()> { let collating_on = match state.collating_on { Some(collating_on) => collating_on, None => { @@ -381,15 +393,28 @@ where } }; + let should_advertise = state.our_validators_groups + .get(&relay_parent) + .map(|g| g.should_advertise_to(&peer)) + .unwrap_or(false); + + if !state.collations.contains_key(&relay_parent) || !should_advertise { + return Ok(()) + } + let wire_message = protocol_v1::CollatorProtocolMessage::AdvertiseCollation(relay_parent, collating_on); ctx.send_message(AllMessages::NetworkBridge( NetworkBridgeMessage::SendCollationMessage( - to, + vec![peer.clone()], protocol_v1::CollationProtocol::CollatorProtocol(wire_message), ) )).await?; + if let Some(validators) = state.our_validators_groups.get_mut(&relay_parent) { + validators.advertised_to_peer(&peer); + } + state.metrics.on_advertisment_made(); Ok(()) @@ -397,14 +422,11 @@ where /// The main incoming message dispatching switch. #[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))] -async fn process_msg( - ctx: &mut Context, +async fn process_msg( + ctx: &mut impl SubsystemContext, state: &mut State, msg: CollatorProtocolMessage, -) -> Result<()> -where - Context: SubsystemContext -{ +) -> Result<()> { use CollatorProtocolMessage::*; let _timer = state.metrics.time_process_msg(); @@ -475,17 +497,14 @@ where /// Issue a response to a previously requested collation. #[tracing::instrument(level = "trace", skip(ctx, state, pov), fields(subsystem = LOG_TARGET))] -async fn send_collation( - ctx: &mut Context, +async fn send_collation( + ctx: &mut impl SubsystemContext, state: &mut State, request_id: RequestId, origin: PeerId, receipt: CandidateReceipt, pov: PoV, -) -> Result<()> -where - Context: SubsystemContext -{ +) -> Result<()> { let wire_message = protocol_v1::CollatorProtocolMessage::Collation( request_id, receipt, @@ -506,15 +525,12 @@ where /// A networking messages switch. #[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))] -async fn handle_incoming_peer_message( - ctx: &mut Context, +async fn handle_incoming_peer_message( + ctx: &mut impl SubsystemContext, state: &mut State, origin: PeerId, msg: protocol_v1::CollatorProtocolMessage, -) -> Result<()> -where - Context: SubsystemContext -{ +) -> Result<()> { use protocol_v1::CollatorProtocolMessage::*; match msg { @@ -568,15 +584,12 @@ where /// Our view has changed. #[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))] -async fn handle_peer_view_change( - ctx: &mut Context, +async fn handle_peer_view_change( + ctx: &mut impl SubsystemContext, state: &mut State, peer_id: PeerId, view: View, -) -> Result<()> -where - Context: SubsystemContext -{ +) -> Result<()> { let current = state.peer_views.entry(peer_id.clone()).or_default(); let added: Vec = view.difference(&*current).cloned().collect(); @@ -584,9 +597,7 @@ where *current = view; for added in added.into_iter() { - if state.collations.contains_key(&added) { - advertise_collation(ctx, state, added.clone(), vec![peer_id.clone()]).await?; - } + advertise_collation(ctx, state, added, peer_id.clone()).await?; } Ok(()) @@ -596,22 +607,30 @@ where /// /// `Declare` that we are a collator with a given `CollatorId`. #[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))] -async fn handle_validator_connected( - ctx: &mut Context, +async fn handle_validator_connected( + ctx: &mut impl SubsystemContext, state: &mut State, peer_id: PeerId, validator_id: ValidatorId, -) -> Result<()> -where - Context: SubsystemContext -{ - // Check if the validator is already known or if maybe its peer id chaned(should not happen) - let unknown = state.known_validators.insert(validator_id, peer_id.clone()).map(|o| o != peer_id).unwrap_or(true); + relay_parent: Hash, +) -> Result<()> { + let not_declared = state.declared_at.insert(peer_id.clone()); - if unknown { - // Only declare the new peers. - declare(ctx, state, vec![peer_id.clone()]).await?; - state.peer_views.insert(peer_id, Default::default()); + if not_declared { + declare(ctx, state, peer_id.clone()).await?; + } + + // Store the PeerId and find out if we should advertise to this peer. + // + // If this peer does not belong to the para validators, we also don't need to try to advertise our collation. + let advertise = if let Some(validators) = state.our_validators_groups.get_mut(&relay_parent) { + validators.add_peer_id_for_validator(&peer_id, &validator_id) + } else { + false + }; + + if advertise && state.peer_interested_in_leaf(&peer_id, &relay_parent) { + advertise_collation(ctx, state, relay_parent, peer_id).await?; } Ok(()) @@ -619,14 +638,11 @@ where /// Bridge messages switch. #[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))] -async fn handle_network_msg( - ctx: &mut Context, +async fn handle_network_msg( + ctx: &mut impl SubsystemContext, state: &mut State, bridge_message: NetworkBridgeEvent, -) -> Result<()> -where - Context: SubsystemContext -{ +) -> Result<()> { use NetworkBridgeEvent::*; match bridge_message { @@ -638,8 +654,8 @@ where handle_peer_view_change(ctx, state, peer_id, view).await?; } PeerDisconnected(peer_id) => { - state.known_validators.retain(|_, v| *v != peer_id); state.peer_views.remove(&peer_id); + state.declared_at.remove(&peer_id); } OurViewChange(view) => { handle_our_view_change(state, view).await?; @@ -658,59 +674,59 @@ async fn handle_our_view_change( state: &mut State, view: View, ) -> Result<()> { - let old_view = std::mem::replace(&mut (state.view), view); - - let view = state.view.clone(); - - let removed = old_view.difference(&view).collect::>(); - - for removed in removed.into_iter() { + for removed in state.view.difference(&view) { state.collations.remove(removed); state.our_validators_groups.remove(removed); + state.connection_requests.remove(removed); } + state.view = view; + Ok(()) } /// The collator protocol collator side main loop. #[tracing::instrument(skip(ctx, metrics), fields(subsystem = LOG_TARGET))] -pub(crate) async fn run( - mut ctx: Context, +pub(crate) async fn run( + mut ctx: impl SubsystemContext, our_id: CollatorId, metrics: Metrics, -) -> Result<()> -where - Context: SubsystemContext -{ +) -> Result<()> { use FromOverseer::*; use OverseerSignal::*; let mut state = State { metrics, + our_id, ..Default::default() }; - state.our_id = our_id; - loop { - if let Some(mut request) = state.last_connection_request.take() { - let _timer = state.metrics.time_handle_connection_request(); + select! { + res = state.connection_requests.next().fuse() => { + let (relay_parent, validator_id, peer_id) = match res { + Some(res) => res, + // Will never happen, but better to be safe. + None => continue, + }; - while let Poll::Ready(Some((validator_id, peer_id))) = futures::poll!(request.next()) { - if let Err(err) = handle_validator_connected(&mut ctx, &mut state, peer_id, validator_id).await { + let _timer = state.metrics.time_handle_connection_request(); + + if let Err(err) = handle_validator_connected( + &mut ctx, + &mut state, + peer_id, + validator_id, + relay_parent, + ).await { tracing::warn!( target: LOG_TARGET, err = ?err, "Failed to declare our collator id", ); } - } - // put it back - state.last_connection_request = Some(request); - } - - while let Poll::Ready(msg) = futures::poll!(ctx.recv()) { - match msg? { + }, + msg = ctx.recv().fuse() => match msg? { Communication { msg } => { if let Err(e) = process_msg(&mut ctx, &mut state, msg).await { tracing::warn!(target: LOG_TARGET, err = ?e, "Failed to process message"); @@ -721,8 +737,6 @@ where Signal(Conclude) => return Ok(()), } } - - futures::pending!() } } @@ -733,7 +747,7 @@ mod tests { use std::time::Duration; use assert_matches::assert_matches; - use futures::{executor, future, Future}; + use futures::{executor, future, Future, channel::mpsc}; use smallvec::smallvec; use sp_core::crypto::Pair; @@ -746,7 +760,6 @@ mod tests { use polkadot_subsystem::{ActiveLeavesUpdate, messages::{RuntimeApiMessage, RuntimeApiRequest}}; use polkadot_node_subsystem_util::TimeoutExt; use polkadot_subsystem_testhelpers as test_helpers; - use polkadot_node_network_protocol::ObservedRole; #[derive(Default)] struct TestCandidateBuilder { @@ -772,14 +785,14 @@ mod tests { #[derive(Clone)] struct TestState { - chain_ids: Vec, + para_id: ParaId, validators: Vec, validator_public: Vec, validator_authority_id: Vec, validator_peer_id: Vec, validator_groups: (Vec>, GroupRotationInfo), relay_parent: Hash, - availability_cores: Vec, + availability_core: CoreState, our_collator_pair: CollatorPair, } @@ -793,10 +806,7 @@ mod tests { impl Default for TestState { fn default() -> Self { - let chain_a = ParaId::from(1); - let chain_b = ParaId::from(2); - - let chain_ids = vec![chain_a, chain_b]; + let para_id = ParaId::from(1); let validators = vec![ Sr25519Keyring::Alice, @@ -813,7 +823,7 @@ mod tests { .take(validator_public.len()) .collect(); - let validator_groups = vec![vec![2, 0, 4], vec![1], vec![3]]; + let validator_groups = vec![vec![2, 0, 4], vec![3, 2, 4]]; let group_rotation_info = GroupRotationInfo { session_start_block: 0, group_rotation_frequency: 100, @@ -821,37 +831,107 @@ mod tests { }; let validator_groups = (validator_groups, group_rotation_info); - let availability_cores = vec![ - CoreState::Scheduled(ScheduledCore { - para_id: chain_ids[0], - collator: None, - }), - CoreState::Scheduled(ScheduledCore { - para_id: chain_ids[1], - collator: None, - }), - ]; + let availability_core = CoreState::Scheduled(ScheduledCore { + para_id, + collator: None, + }); - let relay_parent = Hash::repeat_byte(0x05); + let relay_parent = Hash::random(); let our_collator_pair = CollatorPair::generate().0; Self { - chain_ids, + para_id, validators, validator_public, validator_authority_id, validator_peer_id, validator_groups, relay_parent, - availability_cores, + availability_core, our_collator_pair, } } } + impl TestState { + fn current_group_validator_indices(&self) -> &[ValidatorIndex] { + &self.validator_groups.0[0] + } + + fn current_group_validator_peer_ids(&self) -> Vec { + self.current_group_validator_indices().iter().map(|i| self.validator_peer_id[*i as usize].clone()).collect() + } + + fn current_group_validator_authority_ids(&self) -> Vec { + self.current_group_validator_indices() + .iter() + .map(|i| self.validator_authority_id[*i as usize].clone()) + .collect() + } + + fn current_group_validator_ids(&self) -> Vec { + self.current_group_validator_indices() + .iter() + .map(|i| self.validator_public[*i as usize].clone()) + .collect() + } + + fn next_group_validator_indices(&self) -> &[ValidatorIndex] { + &self.validator_groups.0[1] + } + + fn next_group_validator_authority_ids(&self) -> Vec { + self.next_group_validator_indices() + .iter() + .map(|i| self.validator_authority_id[*i as usize].clone()) + .collect() + } + + fn next_group_validator_ids(&self) -> Vec { + self.next_group_validator_indices() + .iter() + .map(|i| self.validator_public[*i as usize].clone()) + .collect() + } + + /// Returns the unique count of validators in the current and next group. + fn current_and_next_group_unique_validator_count(&self) -> usize { + let mut indices = self.next_group_validator_indices().iter().collect::>(); + indices.extend(self.current_group_validator_indices()); + indices.len() + } + + /// Generate a new relay parent and inform the subsystem about the new view. + /// + /// If `merge_views == true` it means the subsystem will be informed that we working on the old `relay_parent` + /// and the new one. + async fn advance_to_new_round(&mut self, virtual_overseer: &mut VirtualOverseer, merge_views: bool) { + let old_relay_parent = self.relay_parent; + + while self.relay_parent == old_relay_parent { + self.relay_parent.randomize(); + } + + let hashes = if merge_views { + vec![old_relay_parent, self.relay_parent] + } else { + vec![self.relay_parent] + }; + + overseer_send( + virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::OurViewChange(View(hashes)), + ), + ).await; + } + } + + type VirtualOverseer = test_helpers::TestSubsystemContextHandle; + struct TestHarness { - virtual_overseer: test_helpers::TestSubsystemContextHandle, + virtual_overseer: VirtualOverseer, } fn test_harness>( @@ -932,190 +1012,256 @@ mod tests { .expect(&format!("{:?} is more than enough for sending signals.", TIMEOUT)); } + // Setup the system by sending the `CollateOn`, `ActiveLeaves` and `OurViewChange` messages. + async fn setup_system(virtual_overseer: &mut VirtualOverseer, test_state: &TestState) { + overseer_send( + virtual_overseer, + CollatorProtocolMessage::CollateOn(test_state.para_id), + ).await; + + overseer_signal( + virtual_overseer, + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { + activated: smallvec![test_state.relay_parent], + deactivated: smallvec![], + }), + ).await; + + overseer_send( + virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::OurViewChange(View(vec![test_state.relay_parent])), + ), + ).await; + } + + /// Result of [`distribute_collation`] + struct DistributeCollation { + /// Should be used to inform the subsystem about connected validators. + connected: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>, + candidate: CandidateReceipt, + pov_block: PoV, + } + + /// Create some PoV and distribute it. + async fn distribute_collation( + virtual_overseer: &mut VirtualOverseer, + test_state: &TestState, + ) -> DistributeCollation { + // Now we want to distribute a PoVBlock + let pov_block = PoV { + block_data: BlockData(vec![42, 43, 44]), + }; + + let pov_hash = pov_block.hash(); + + let candidate = TestCandidateBuilder { + para_id: test_state.para_id, + relay_parent: test_state.relay_parent, + pov_hash, + ..Default::default() + }.build(); + + overseer_send( + virtual_overseer, + CollatorProtocolMessage::DistributeCollation(candidate.clone(), pov_block.clone()), + ).await; + + // obtain the availability cores. + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::AvailabilityCores(tx) + )) => { + assert_eq!(relay_parent, test_state.relay_parent); + tx.send(Ok(vec![test_state.availability_core.clone()])).unwrap(); + } + ); + + // Obtain the validator groups + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::ValidatorGroups(tx) + )) => { + assert_eq!(relay_parent, test_state.relay_parent); + tx.send(Ok(test_state.validator_groups.clone())).unwrap(); + } + ); + + // obtain the validators per relay parent + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Validators(tx), + )) => { + assert_eq!(relay_parent, test_state.relay_parent); + tx.send(Ok(test_state.validator_public.clone())).unwrap(); + } + ); + + // obtain the validator_id to authority_id mapping + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::ValidatorDiscovery(validators, tx), + )) => { + assert_eq!(relay_parent, test_state.relay_parent); + assert_eq!(validators.len(), test_state.current_and_next_group_unique_validator_count()); + + let current_validators = test_state.current_group_validator_ids(); + let next_validators = test_state.next_group_validator_ids(); + + assert!(validators.iter().all(|v| current_validators.contains(&v) || next_validators.contains(&v))); + + let current_validators = test_state.current_group_validator_authority_ids(); + let next_validators = test_state.next_group_validator_authority_ids(); + + tx.send(Ok(current_validators.into_iter().chain(next_validators).map(Some).collect())).unwrap(); + } + ); + + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::ConnectToValidators { + connected, + .. + } + ) => { + DistributeCollation { + connected, + candidate, + pov_block, + } + } + ) + } + + /// Connect a peer + async fn connect_peer(virtual_overseer: &mut VirtualOverseer, peer: PeerId) { + overseer_send( + virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerConnected( + peer.clone(), + polkadot_node_network_protocol::ObservedRole::Authority, + ), + ), + ).await; + + overseer_send( + virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerViewChange(peer, View(Default::default())), + ), + ).await; + } + + /// Disconnect a peer + async fn disconnect_peer(virtual_overseer: &mut VirtualOverseer, peer: PeerId) { + overseer_send( + virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::PeerDisconnected(peer)), + ).await; + } + + /// Check that the next received message is a `Declare` message. + async fn expect_declare_msg(virtual_overseer: &mut VirtualOverseer, test_state: &TestState, peer: &PeerId) { + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::SendCollationMessage( + to, + protocol_v1::CollationProtocol::CollatorProtocol(wire_message), + ) + ) => { + assert_eq!(to[0], *peer); + assert_matches!( + wire_message, + protocol_v1::CollatorProtocolMessage::Declare(collator_id) => { + assert_eq!(collator_id, test_state.our_collator_pair.public()); + } + ); + } + ); + } + + /// Check that the next received message is a collation advertisment message. + async fn expect_advertise_collation_msg( + virtual_overseer: &mut VirtualOverseer, + test_state: &TestState, + peer: &PeerId, + expected_relay_parent: Hash, + ) { + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::SendCollationMessage( + to, + protocol_v1::CollationProtocol::CollatorProtocol(wire_message), + ) + ) => { + assert_eq!(to[0], *peer); + assert_matches!( + wire_message, + protocol_v1::CollatorProtocolMessage::AdvertiseCollation( + relay_parent, + collating_on, + ) => { + assert_eq!(relay_parent, expected_relay_parent); + assert_eq!(collating_on, test_state.para_id); + } + ); + } + ); + } + + /// Send a message that the given peer's view changed. + async fn send_peer_view_change(virtual_overseer: &mut VirtualOverseer, peer: &PeerId, hashes: Vec) { + overseer_send( + virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerViewChange(peer.clone(), View(hashes)), + ), + ).await; + } + #[test] fn advertise_and_send_collation() { - let test_state = TestState::default(); + let mut test_state = TestState::default(); test_harness(test_state.our_collator_pair.public(), |test_harness| async move { - let current = test_state.relay_parent; let mut virtual_overseer = test_harness.virtual_overseer; - let pov_block = PoV { - block_data: BlockData(vec![42, 43, 44]), - }; + setup_system(&mut virtual_overseer, &test_state).await; - let pov_hash = pov_block.hash(); - - let candidate = TestCandidateBuilder { - para_id: test_state.chain_ids[0], - relay_parent: test_state.relay_parent, - pov_hash, - ..Default::default() - }.build(); - - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::CollateOn(test_state.chain_ids[0]) - ).await; - - overseer_signal( - &mut virtual_overseer, - OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { - activated: smallvec![current.clone()], - deactivated: smallvec![], - }), - ).await; - - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::OurViewChange(View(vec![current])), - ), - ).await; - - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::DistributeCollation(candidate.clone(), pov_block.clone()), - ).await; - - // obtain the availability cores. - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::AvailabilityCores(tx) - )) => { - assert_eq!(relay_parent, current); - tx.send(Ok(test_state.availability_cores.clone())).unwrap(); - } - ); - - // Obtain the validator groups - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::ValidatorGroups(tx) - )) => { - assert_eq!(relay_parent, current); - tx.send(Ok(test_state.validator_groups.clone())).unwrap(); - } - ); - - // obtain the validators per relay parent - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::Validators(tx), - )) => { - assert_eq!(relay_parent, current); - tx.send(Ok(test_state.validator_public.clone())).unwrap(); - } - ); - - // obtain the validator_id to authority_id mapping - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::ValidatorDiscovery(validators, tx), - )) => { - assert_eq!(relay_parent, current); - assert_eq!(validators.len(), 4); - assert!(validators.iter().all(|v| test_state.validator_public.contains(&v))); - - let result = vec![ - Some(test_state.validator_authority_id[2].clone()), - Some(test_state.validator_authority_id[0].clone()), - Some(test_state.validator_authority_id[4].clone()), - Some(test_state.validator_authority_id[1].clone()), - ]; - tx.send(Ok(result)).unwrap(); - } - ); - - // We now should connect to our validator group. - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ConnectToValidators { - validator_ids, - mut connected, - .. - } - ) => { - assert_eq!(validator_ids.len(), 4); - assert!(validator_ids.iter().all(|id| test_state.validator_authority_id.contains(id))); - - let result = vec![ - (test_state.validator_authority_id[2].clone(), test_state.validator_peer_id[2].clone()), - (test_state.validator_authority_id[0].clone(), test_state.validator_peer_id[0].clone()), - (test_state.validator_authority_id[4].clone(), test_state.validator_peer_id[4].clone()), - (test_state.validator_authority_id[1].clone(), test_state.validator_peer_id[1].clone()), - ]; - - result.into_iter().for_each(|r| connected.try_send(r).unwrap()); - } - ); + let DistributeCollation { mut connected, candidate, pov_block } = + distribute_collation(&mut virtual_overseer, &test_state).await; + test_state.current_group_validator_authority_ids() + .into_iter() + .zip(test_state.current_group_validator_peer_ids()) + .for_each(|r| connected.try_send(r).unwrap()); // We declare to the connected validators that we are a collator. // We need to catch all `Declare` messages to the validators we've // previosly connected to. - for i in vec![2, 0, 4, 1].into_iter() { - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::SendCollationMessage( - to, - protocol_v1::CollationProtocol::CollatorProtocol(wire_message), - ) - ) => { - assert_eq!(to, vec![test_state.validator_peer_id[i].clone()]); - assert_matches!( - wire_message, - protocol_v1::CollatorProtocolMessage::Declare(collator_id) => { - assert_eq!(collator_id, test_state.our_collator_pair.public()); - } - ); - } - ); + for peer_id in test_state.current_group_validator_peer_ids() { + expect_declare_msg(&mut virtual_overseer, &test_state, &peer_id).await; } + let peer = test_state.current_group_validator_peer_ids()[0].clone(); + // Send info about peer's view. - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerViewChange( - test_state.validator_peer_id[2].clone(), - View(vec![current]), - ) - ) - ).await; + send_peer_view_change(&mut virtual_overseer, &peer, vec![test_state.relay_parent]).await; // The peer is interested in a leaf that we have a collation for; // advertise it. - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::SendCollationMessage( - to, - protocol_v1::CollationProtocol::CollatorProtocol(wire_message), - ) - ) => { - assert_eq!(to, vec![test_state.validator_peer_id[2].clone()]); - assert_matches!( - wire_message, - protocol_v1::CollatorProtocolMessage::AdvertiseCollation( - relay_parent, - collating_on, - ) => { - assert_eq!(relay_parent, current); - assert_eq!(collating_on, test_state.chain_ids[0]); - } - ); - } - ); + expect_advertise_collation_msg(&mut virtual_overseer, &test_state, &peer, test_state.relay_parent).await; let request_id = 42; @@ -1124,11 +1270,11 @@ mod tests { &mut virtual_overseer, CollatorProtocolMessage::NetworkBridgeUpdateV1( NetworkBridgeEvent::PeerMessage( - test_state.validator_peer_id[2].clone(), + peer.clone(), protocol_v1::CollatorProtocolMessage::RequestCollation( request_id, - current, - test_state.chain_ids[0], + test_state.relay_parent, + test_state.para_id, ) ) ) @@ -1143,7 +1289,7 @@ mod tests { protocol_v1::CollationProtocol::CollatorProtocol(wire_message), ) ) => { - assert_eq!(to, vec![test_state.validator_peer_id[2].clone()]); + assert_eq!(to, vec![peer]); assert_matches!( wire_message, protocol_v1::CollatorProtocolMessage::Collation(req_id, receipt, pov) => { @@ -1155,28 +1301,21 @@ mod tests { } ); - let new_head = Hash::repeat_byte(0xA); + let old_relay_parent = test_state.relay_parent; + test_state.advance_to_new_round(&mut virtual_overseer, false).await; - // Collator's view moves on. - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::OurViewChange(View(vec![new_head])), - ), - ).await; - - let request_id = 43; + let peer = test_state.validator_peer_id[2].clone(); // Re-request a collation. overseer_send( &mut virtual_overseer, CollatorProtocolMessage::NetworkBridgeUpdateV1( NetworkBridgeEvent::PeerMessage( - test_state.validator_peer_id[2].clone(), + peer.clone(), protocol_v1::CollatorProtocolMessage::RequestCollation( - request_id, - current, - test_state.chain_ids[0], + 43, + old_relay_parent, + test_state.para_id, ) ) ) @@ -1184,123 +1323,25 @@ mod tests { assert!(overseer_recv_with_timeout(&mut virtual_overseer, TIMEOUT).await.is_none()); - let pov_block = PoV { - block_data: BlockData(vec![45, 46, 47]), - }; - - let pov_hash = pov_block.hash(); - let current = Hash::repeat_byte(33); - - let candidate = TestCandidateBuilder { - para_id: test_state.chain_ids[0], - relay_parent: current, - pov_hash, - ..Default::default() - }.build(); - - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::OurViewChange(View(vec![current])), - ), - ).await; + let DistributeCollation { mut connected, .. } = + distribute_collation(&mut virtual_overseer, &test_state).await; + test_state.current_group_validator_authority_ids() + .into_iter() + .zip(test_state.current_group_validator_peer_ids()) + .for_each(|r| connected.try_send(r).unwrap()); // Send info about peer's view. overseer_send( &mut virtual_overseer, CollatorProtocolMessage::NetworkBridgeUpdateV1( NetworkBridgeEvent::PeerViewChange( - test_state.validator_peer_id[2].clone(), - View(vec![current]), + peer.clone(), + View(vec![test_state.relay_parent]), ) ) ).await; - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::DistributeCollation(candidate.clone(), pov_block.clone()), - ).await; - - // obtain the availability cores. - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::AvailabilityCores(tx) - )) => { - assert_eq!(relay_parent, current); - tx.send(Ok(test_state.availability_cores.clone())).unwrap(); - } - ); - - // Obtain the validator groups - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::ValidatorGroups(tx) - )) => { - assert_eq!(relay_parent, current); - tx.send(Ok(test_state.validator_groups.clone())).unwrap(); - } - ); - - // obtain the validators per relay parent - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::Validators(tx), - )) => { - assert_eq!(relay_parent, current); - tx.send(Ok(test_state.validator_public.clone())).unwrap(); - } - ); - - // The peer is interested in a leaf that we have a collation for; - // advertise it. - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::SendCollationMessage( - to, - protocol_v1::CollationProtocol::CollatorProtocol(wire_message), - ) - ) => { - assert_eq!(to, vec![test_state.validator_peer_id[2].clone()]); - assert_matches!( - wire_message, - protocol_v1::CollatorProtocolMessage::AdvertiseCollation( - relay_parent, - collating_on, - ) => { - assert_eq!(relay_parent, current); - assert_eq!(collating_on, test_state.chain_ids[0]); - } - ); - } - ); - - // obtain the validator_id to authority_id mapping - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::ValidatorDiscovery(validators, tx), - )) => { - assert_eq!(relay_parent, current); - assert_eq!(validators.len(), 4); - assert!(validators.iter().all(|p| test_state.validator_public.contains(p))); - - let result = vec![ - Some(test_state.validator_authority_id[2].clone()), - Some(test_state.validator_authority_id[0].clone()), - Some(test_state.validator_authority_id[4].clone()), - Some(test_state.validator_authority_id[1].clone()), - ]; - tx.send(Ok(result)).unwrap(); - } - ); + expect_advertise_collation_msg(&mut virtual_overseer, &test_state, &peer, test_state.relay_parent).await; }); } @@ -1316,148 +1357,132 @@ mod tests { let peer = test_state.validator_peer_id[0].clone(); let validator_id = test_state.validator_authority_id[0].clone(); - // Setup the system correctly - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::CollateOn(test_state.chain_ids[0]), - ).await; - - overseer_signal( - &mut virtual_overseer, - OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { - activated: smallvec![test_state.relay_parent], - deactivated: smallvec![], - }), - ).await; - - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::OurViewChange(View(vec![test_state.relay_parent])), - ), - ).await; + setup_system(&mut virtual_overseer, &test_state).await; // A validator connected to us - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Authority), - ), - ).await; + connect_peer(&mut virtual_overseer, peer.clone()).await; - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerViewChange(peer.clone(), View(Default::default())), - ), - ).await; + let mut connected = distribute_collation(&mut virtual_overseer, &test_state).await.connected; + connected.try_send((validator_id, peer.clone())).unwrap(); - // Now we want to distribute a PoVBlock - let pov_block = PoV { - block_data: BlockData(vec![42, 43, 44]), - }; + expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await; + }) + } - let pov_hash = pov_block.hash(); + #[test] + fn collations_are_only_advertised_to_validators_with_correct_view() { + let test_state = TestState::default(); - let candidate = TestCandidateBuilder { - para_id: test_state.chain_ids[0], - relay_parent: test_state.relay_parent, - pov_hash, - ..Default::default() - }.build(); + test_harness(test_state.our_collator_pair.public(), |test_harness| async move { + let mut virtual_overseer = test_harness.virtual_overseer; - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::DistributeCollation(candidate.clone(), pov_block.clone()), - ).await; + let peer = test_state.current_group_validator_peer_ids()[0].clone(); + let validator_id = test_state.current_group_validator_authority_ids()[0].clone(); - // obtain the availability cores. - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::AvailabilityCores(tx) - )) => { - assert_eq!(relay_parent, test_state.relay_parent); - tx.send(Ok(test_state.availability_cores.clone())).unwrap(); - } - ); + let peer2 = test_state.current_group_validator_peer_ids()[1].clone(); + let validator_id2 = test_state.current_group_validator_authority_ids()[1].clone(); - // Obtain the validator groups - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::ValidatorGroups(tx) - )) => { - assert_eq!(relay_parent, test_state.relay_parent); - tx.send(Ok(test_state.validator_groups.clone())).unwrap(); - } - ); + setup_system(&mut virtual_overseer, &test_state).await; - // obtain the validators per relay parent - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::Validators(tx), - )) => { - assert_eq!(relay_parent, test_state.relay_parent); - tx.send(Ok(test_state.validator_public.clone())).unwrap(); - } - ); + // A validator connected to us + connect_peer(&mut virtual_overseer, peer.clone()).await; - // obtain the validator_id to authority_id mapping - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::ValidatorDiscovery(validators, tx), - )) => { - assert_eq!(relay_parent, test_state.relay_parent); - assert_eq!(validators.len(), 4); - assert!(validators.iter().all(|v| test_state.validator_public.contains(&v))); + // Connect the second validator + connect_peer(&mut virtual_overseer, peer2.clone()).await; - let result = vec![ - Some(test_state.validator_authority_id[2].clone()), - Some(test_state.validator_authority_id[0].clone()), - Some(test_state.validator_authority_id[4].clone()), - Some(test_state.validator_authority_id[1].clone()), - ]; - tx.send(Ok(result)).unwrap(); - } - ); + // And let it tell us that it is has the same view. + send_peer_view_change(&mut virtual_overseer, &peer2, vec![test_state.relay_parent]).await; - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ConnectToValidators { - mut connected, - .. - } - ) => { - connected.try_send((validator_id, peer.clone())).unwrap(); - } - ); + let mut connected = distribute_collation(&mut virtual_overseer, &test_state).await.connected; + connected.try_send((validator_id, peer.clone())).unwrap(); + connected.try_send((validator_id2, peer2.clone())).unwrap(); - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::SendCollationMessage( - peer_id, - msg, - ) - ) => { - assert_matches!( - msg, - protocol_v1::CollationProtocol::CollatorProtocol( - protocol_v1::CollatorProtocolMessage::Declare(collator_id), - ) if collator_id == test_state.our_collator_pair.public() - ); + expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await; + expect_declare_msg(&mut virtual_overseer, &test_state, &peer2).await; - assert_eq!(peer, peer_id[0]); - } - ); + expect_advertise_collation_msg(&mut virtual_overseer, &test_state, &peer2, test_state.relay_parent).await; + + // The other validator announces that it changed its view. + send_peer_view_change(&mut virtual_overseer, &peer, vec![test_state.relay_parent]).await; + + // After changing the view we should receive the advertisement + expect_advertise_collation_msg(&mut virtual_overseer, &test_state, &peer, test_state.relay_parent).await; + }) + } + + #[test] + fn collate_on_two_different_relay_chain_blocks() { + let mut test_state = TestState::default(); + + test_harness(test_state.our_collator_pair.public(), |test_harness| async move { + let mut virtual_overseer = test_harness.virtual_overseer; + + let peer = test_state.current_group_validator_peer_ids()[0].clone(); + let validator_id = test_state.current_group_validator_authority_ids()[0].clone(); + + let peer2 = test_state.current_group_validator_peer_ids()[1].clone(); + let validator_id2 = test_state.current_group_validator_authority_ids()[1].clone(); + + setup_system(&mut virtual_overseer, &test_state).await; + + // A validator connected to us + connect_peer(&mut virtual_overseer, peer.clone()).await; + + // Connect the second validator + connect_peer(&mut virtual_overseer, peer2.clone()).await; + + let mut connected = distribute_collation(&mut virtual_overseer, &test_state).await.connected; + connected.try_send((validator_id.clone(), peer.clone())).unwrap(); + connected.try_send((validator_id2.clone(), peer2.clone())).unwrap(); + + expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await; + expect_declare_msg(&mut virtual_overseer, &test_state, &peer2).await; + + let old_relay_parent = test_state.relay_parent; + + // Advance to a new round, while informing the subsystem that the old and the new relay parent are active. + test_state.advance_to_new_round(&mut virtual_overseer, true).await; + + let mut connected = distribute_collation(&mut virtual_overseer, &test_state).await.connected; + connected.try_send((validator_id, peer.clone())).unwrap(); + connected.try_send((validator_id2, peer2.clone())).unwrap(); + + send_peer_view_change(&mut virtual_overseer, &peer, vec![old_relay_parent]).await; + expect_advertise_collation_msg(&mut virtual_overseer, &test_state, &peer, old_relay_parent).await; + + send_peer_view_change(&mut virtual_overseer, &peer2, vec![test_state.relay_parent]).await; + expect_advertise_collation_msg(&mut virtual_overseer, &test_state, &peer2, test_state.relay_parent).await; + }) + } + + #[test] + fn validator_reconnect_does_not_advertise_a_second_time() { + let test_state = TestState::default(); + + test_harness(test_state.our_collator_pair.public(), |test_harness| async move { + let mut virtual_overseer = test_harness.virtual_overseer; + + let peer = test_state.current_group_validator_peer_ids()[0].clone(); + let validator_id = test_state.current_group_validator_authority_ids()[0].clone(); + + setup_system(&mut virtual_overseer, &test_state).await; + + // A validator connected to us + connect_peer(&mut virtual_overseer, peer.clone()).await; + + let mut connected = distribute_collation(&mut virtual_overseer, &test_state).await.connected; + connected.try_send((validator_id.clone(), peer.clone())).unwrap(); + + expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await; + send_peer_view_change(&mut virtual_overseer, &peer, vec![test_state.relay_parent]).await; + expect_advertise_collation_msg(&mut virtual_overseer, &test_state, &peer, test_state.relay_parent).await; + + // Disconnect and reconnect directly + disconnect_peer(&mut virtual_overseer, peer.clone()).await; + connect_peer(&mut virtual_overseer, peer.clone()).await; + send_peer_view_change(&mut virtual_overseer, &peer, vec![test_state.relay_parent]).await; + + assert!(overseer_recv_with_timeout(&mut virtual_overseer, TIMEOUT).await.is_none()); }) } } diff --git a/polkadot/node/network/collator-protocol/src/lib.rs b/polkadot/node/network/collator-protocol/src/lib.rs index 5909c0b8e6..73ed2c956b 100644 --- a/polkadot/node/network/collator-protocol/src/lib.rs +++ b/polkadot/node/network/collator-protocol/src/lib.rs @@ -18,6 +18,7 @@ //! This subsystem implements both sides of the collator protocol. #![deny(missing_docs, unused_crate_dependencies)] +#![recursion_limit="256"] use std::time::Duration; use futures::{channel::oneshot, FutureExt, TryFutureExt};