From b9a9e836caf01173c6a61e4561952aeabdf882dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Sun, 22 Nov 2020 12:55:05 +0100 Subject: [PATCH] Improve collator side of the collator-protocol (#1955) * Improve collator side of the collator-protocol This pr improves the collator-protocol implementation of the collator side. Besides cleaning up code and rewriting it, the following changed: - Before on `PeerViewChange` we send an advertisment to every peer, now this only happens for validators. - It also adds a check that we send an advertisment message only once for a connected peer. - If the same validator was part of the current and next group, we requested to be connected to this validator two times. This is also fixed now. - Instead of having only one connection request, we now are being able to store multiple of them. This is required as we can have multiple active leafs at any point of time. * Switch to common `ConnectionRequests` * Update node/network/collator-protocol/src/collator_side.rs --- .../collator-protocol/src/collator_side.rs | 1271 +++++++++-------- .../node/network/collator-protocol/src/lib.rs | 1 + 2 files changed, 649 insertions(+), 623 deletions(-) diff --git a/polkadot/node/network/collator-protocol/src/collator_side.rs b/polkadot/node/network/collator-protocol/src/collator_side.rs index 5a2d48003a..784a246b8b 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side.rs @@ -14,15 +14,14 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use super::{LOG_TARGET, Result}; -use futures::{StreamExt, task::Poll}; +use futures::{StreamExt, select, FutureExt}; use polkadot_primitives::v1::{ - CollatorId, CoreIndex, CoreState, Hash, Id as ParaId, CandidateReceipt, - PoV, ValidatorId, + CollatorId, CoreIndex, CoreState, Hash, Id as ParaId, CandidateReceipt, PoV, ValidatorId, }; use polkadot_subsystem::{ FromOverseer, OverseerSignal, SubsystemContext, @@ -120,6 +119,60 @@ impl metrics::Metrics for Metrics { } } +/// The group of validators that is assigned to our para at a given point of time. +/// +/// This structure is responsible for keeping track of which validators belong to a certain group for a para. It also +/// stores a mapping from [`PeerId`] to [`ValidatorId`] as we learn about it over the lifetime of this object. Besides +/// that it also keeps track to which validators we advertised our collation. +struct ValidatorGroup { + /// All [`ValidatorId`]'s that are assigned to us in this group. + validator_ids: HashSet, + /// The mapping from [`PeerId`] to [`ValidatorId`]. This is filled over time as we learn the [`PeerId`]'s from the + /// authority discovery. It is not ensured that this will contain *all* validators of this group. + peer_ids: HashMap, + /// All [`ValidatorId`]'s of the current group to that we advertised our collation. + advertised_to: HashSet, +} + +impl ValidatorGroup { + /// Returns `true` if we should advertise our collation to the given peer. + fn should_advertise_to(&self, peer: &PeerId) -> bool { + match self.peer_ids.get(peer) { + Some(validator_id) => !self.advertised_to.contains(validator_id), + None => false, + } + } + + /// Should be called after we advertised our collation to the given `peer` to keep track of it. + fn advertised_to_peer(&mut self, peer: &PeerId) { + if let Some(validator_id) = self.peer_ids.get(peer) { + self.advertised_to.insert(validator_id.clone()); + } + } + + /// Add a [`PeerId`] that belongs to the given [`ValidatorId`]. + /// + /// This returns `true` if the given validator belongs to this group and we could insert its [`PeerId`]. + fn add_peer_id_for_validator(&mut self, peer_id: &PeerId, validator_id: &ValidatorId) -> bool { + if !self.validator_ids.contains(validator_id) { + false + } else { + self.peer_ids.insert(peer_id.clone(), validator_id.clone()); + true + } + } +} + +impl From> for ValidatorGroup { + fn from(validator_ids: HashSet) -> Self { + Self { + validator_ids, + peer_ids: HashMap::new(), + advertised_to: HashSet::new(), + } + } +} + #[derive(Default)] struct State { /// Our id. @@ -141,24 +194,26 @@ struct State { /// We will keep up to one local collation per relay-parent. collations: HashMap, - /// Our validator groups active leafs. - our_validators_groups: HashMap>, + /// Our validator groups per active leaf. + our_validators_groups: HashMap, - /// Validators we know about via `ConnectToValidators` message. - /// - /// These are the only validators we are interested in talking to and as such - /// all actions from peers not in this map will be ignored. - /// Entries in this map will be cleared as validator groups in `our_validator_groups` - /// go out of scope with their respective deactivated leafs. - known_validators: HashMap, + /// List of peers where we declared ourself as a collator. + declared_at: HashSet, - /// Use to await for the next validator connection and revoke the request. - last_connection_request: Option, + /// The connection requests to validators per relay parent. + connection_requests: validator_discovery::ConnectionRequests, /// Metrics. metrics: Metrics, } +impl State { + /// Returns `true` if the given `peer` is interested in the leaf that is represented by `relay_parent`. + fn peer_interested_in_leaf(&self, peer: &PeerId, relay_parent: &Hash) -> bool { + self.peer_views.get(peer).map(|v| v.contains(relay_parent)).unwrap_or(false) + } +} + /// Distribute a collation. /// /// Figure out the core our para is assigned to and the relevant validators. @@ -168,16 +223,13 @@ struct State { /// as it must be invalid in that case - although this indicates a logic error /// elsewhere in the node. #[tracing::instrument(level = "trace", skip(ctx, state, pov), fields(subsystem = LOG_TARGET))] -async fn distribute_collation( - ctx: &mut Context, +async fn distribute_collation( + ctx: &mut impl SubsystemContext, state: &mut State, id: ParaId, receipt: CandidateReceipt, pov: PoV, -) -> Result<()> -where - Context: SubsystemContext -{ +) -> Result<()> { let relay_parent = receipt.descriptor.relay_parent; // This collation is not in the active-leaves set. @@ -207,41 +259,28 @@ where relay_parent = %relay_parent, "looks like no core is assigned to {} at {}", id, relay_parent, ); - return Ok(()); + + return Ok(()) } }; // Determine the group on that core and the next group on that core. - let our_validators = match determine_our_validators(ctx, our_core, num_cores, relay_parent).await? { - Some(validators) => validators, - None => { - tracing::warn!( - target: LOG_TARGET, - core = ?our_core, - "there are no validators assigned to core", - ); + let (current_validators, next_validators) = determine_our_validators(ctx, our_core, num_cores, relay_parent).await?; - return Ok(()); - } - }; + if current_validators.is_empty() && next_validators.is_empty() { + tracing::warn!( + target: LOG_TARGET, + core = ?our_core, + "there are no validators assigned to core", + ); - state.our_validators_groups.insert(relay_parent, our_validators.clone()); - - // We may be already connected to some of the validators. In that case, - // advertise a collation to them right away. - for validator in our_validators.iter() { - if let Some(peer) = state.known_validators.get(&validator) { - if let Some(view) = state.peer_views.get(peer) { - if view.contains(&relay_parent) { - let peer = peer.clone(); - advertise_collation(ctx, state, relay_parent, vec![peer]).await?; - } - } - } + return Ok(()) } // Issue a discovery request for the validators of the current group and the next group. - connect_to_validators(ctx, relay_parent, state, our_validators).await?; + connect_to_validators(ctx, relay_parent, state, current_validators.union(&next_validators).cloned().collect()).await?; + + state.our_validators_groups.insert(relay_parent, current_validators.into()); state.collations.insert(relay_parent, (receipt, pov)); @@ -251,14 +290,11 @@ where /// Get the Id of the Core that is assigned to the para being collated on if any /// and the total number of cores. #[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))] -async fn determine_core( - ctx: &mut Context, +async fn determine_core( + ctx: &mut impl SubsystemContext, para_id: ParaId, relay_parent: Hash, -) -> Result> -where - Context: SubsystemContext -{ +) -> Result> { let cores = request_availability_cores_ctx(relay_parent, ctx).await?.await??; for (idx, core) in cores.iter().enumerate() { @@ -272,63 +308,46 @@ where Ok(None) } -/// Figure out a group of validators assigned to the para being collated on. +/// Figure out current and next group of validators assigned to the para being collated on. /// -/// This returns validators for the current group and the next group. +/// Returns [`ValidatorId`]'s of current and next group as determined based on the `relay_parent`. #[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))] -async fn determine_our_validators( - ctx: &mut Context, +async fn determine_our_validators( + ctx: &mut impl SubsystemContext, core_index: CoreIndex, cores: usize, relay_parent: Hash, -) -> Result>> -where - Context: SubsystemContext -{ +) -> Result<(HashSet, HashSet)> { let groups = request_validator_groups_ctx(relay_parent, ctx).await?; let groups = groups.await??; let current_group_index = groups.1.group_for_core(core_index, cores); - - let mut connect_to_validators = match groups.0.get(current_group_index.0 as usize) { - Some(group) => group.clone(), - None => return Ok(None), - }; + let current_validators = groups.0.get(current_group_index.0 as usize).map(|v| v.as_slice()).unwrap_or_default(); let next_group_idx = (current_group_index.0 as usize + 1) % groups.0.len(); + let next_validators = groups.0.get(next_group_idx).map(|v| v.as_slice()).unwrap_or_default(); - if let Some(next_group) = groups.0.get(next_group_idx) { - connect_to_validators.extend_from_slice(&next_group); - } + let validators = request_validators_ctx(relay_parent, ctx).await?.await??; - let validators = request_validators_ctx(relay_parent, ctx).await?; + let current_validators = current_validators.iter().map(|i| validators[*i as usize].clone()).collect(); + let next_validators = next_validators.iter().map(|i| validators[*i as usize].clone()).collect(); - let validators = validators.await??; - - let validators = connect_to_validators - .into_iter() - .map(|idx| validators[idx as usize].clone()) - .collect(); - - Ok(Some(validators)) + Ok((current_validators, next_validators)) } -/// Issue a `Declare` collation message to a set of peers. +/// Issue a `Declare` collation message to the given `peer`. #[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))] -async fn declare( - ctx: &mut Context, +async fn declare( + ctx: &mut impl SubsystemContext, state: &mut State, - to: Vec, -) -> Result<()> -where - Context: SubsystemContext -{ + peer: PeerId, +) -> Result<()> { let wire_message = protocol_v1::CollatorProtocolMessage::Declare(state.our_id.clone()); ctx.send_message(AllMessages::NetworkBridge( NetworkBridgeMessage::SendCollationMessage( - to, + vec![peer], protocol_v1::CollationProtocol::CollatorProtocol(wire_message), ) )).await?; @@ -339,41 +358,34 @@ where /// Issue a connection request to a set of validators and /// revoke the previous connection request. #[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))] -async fn connect_to_validators( - ctx: &mut Context, +async fn connect_to_validators( + ctx: &mut impl SubsystemContext, relay_parent: Hash, state: &mut State, validators: Vec, -) -> Result<()> -where - Context: SubsystemContext -{ - if let Some(request) = state.last_connection_request.take() { - drop(request); - } - +) -> Result<()> { let request = validator_discovery::connect_to_validators( ctx, relay_parent, validators, ).await?; - state.last_connection_request = Some(request); + state.connection_requests.put(relay_parent, request); Ok(()) } -/// Advertise collation to a set of relay chain validators. +/// Advertise collation to the given `peer`. +/// +/// This will only advertise a collation if there exists one for the given `relay_parent` and the given `peer` is +/// set as validator for our para at the given `relay_parent`. #[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))] -async fn advertise_collation( - ctx: &mut Context, +async fn advertise_collation( + ctx: &mut impl SubsystemContext, state: &mut State, relay_parent: Hash, - to: Vec, -) -> Result<()> -where - Context: SubsystemContext -{ + peer: PeerId, +) -> Result<()> { let collating_on = match state.collating_on { Some(collating_on) => collating_on, None => { @@ -381,15 +393,28 @@ where } }; + let should_advertise = state.our_validators_groups + .get(&relay_parent) + .map(|g| g.should_advertise_to(&peer)) + .unwrap_or(false); + + if !state.collations.contains_key(&relay_parent) || !should_advertise { + return Ok(()) + } + let wire_message = protocol_v1::CollatorProtocolMessage::AdvertiseCollation(relay_parent, collating_on); ctx.send_message(AllMessages::NetworkBridge( NetworkBridgeMessage::SendCollationMessage( - to, + vec![peer.clone()], protocol_v1::CollationProtocol::CollatorProtocol(wire_message), ) )).await?; + if let Some(validators) = state.our_validators_groups.get_mut(&relay_parent) { + validators.advertised_to_peer(&peer); + } + state.metrics.on_advertisment_made(); Ok(()) @@ -397,14 +422,11 @@ where /// The main incoming message dispatching switch. #[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))] -async fn process_msg( - ctx: &mut Context, +async fn process_msg( + ctx: &mut impl SubsystemContext, state: &mut State, msg: CollatorProtocolMessage, -) -> Result<()> -where - Context: SubsystemContext -{ +) -> Result<()> { use CollatorProtocolMessage::*; let _timer = state.metrics.time_process_msg(); @@ -475,17 +497,14 @@ where /// Issue a response to a previously requested collation. #[tracing::instrument(level = "trace", skip(ctx, state, pov), fields(subsystem = LOG_TARGET))] -async fn send_collation( - ctx: &mut Context, +async fn send_collation( + ctx: &mut impl SubsystemContext, state: &mut State, request_id: RequestId, origin: PeerId, receipt: CandidateReceipt, pov: PoV, -) -> Result<()> -where - Context: SubsystemContext -{ +) -> Result<()> { let wire_message = protocol_v1::CollatorProtocolMessage::Collation( request_id, receipt, @@ -506,15 +525,12 @@ where /// A networking messages switch. #[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))] -async fn handle_incoming_peer_message( - ctx: &mut Context, +async fn handle_incoming_peer_message( + ctx: &mut impl SubsystemContext, state: &mut State, origin: PeerId, msg: protocol_v1::CollatorProtocolMessage, -) -> Result<()> -where - Context: SubsystemContext -{ +) -> Result<()> { use protocol_v1::CollatorProtocolMessage::*; match msg { @@ -568,15 +584,12 @@ where /// Our view has changed. #[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))] -async fn handle_peer_view_change( - ctx: &mut Context, +async fn handle_peer_view_change( + ctx: &mut impl SubsystemContext, state: &mut State, peer_id: PeerId, view: View, -) -> Result<()> -where - Context: SubsystemContext -{ +) -> Result<()> { let current = state.peer_views.entry(peer_id.clone()).or_default(); let added: Vec = view.difference(&*current).cloned().collect(); @@ -584,9 +597,7 @@ where *current = view; for added in added.into_iter() { - if state.collations.contains_key(&added) { - advertise_collation(ctx, state, added.clone(), vec![peer_id.clone()]).await?; - } + advertise_collation(ctx, state, added, peer_id.clone()).await?; } Ok(()) @@ -596,22 +607,30 @@ where /// /// `Declare` that we are a collator with a given `CollatorId`. #[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))] -async fn handle_validator_connected( - ctx: &mut Context, +async fn handle_validator_connected( + ctx: &mut impl SubsystemContext, state: &mut State, peer_id: PeerId, validator_id: ValidatorId, -) -> Result<()> -where - Context: SubsystemContext -{ - // Check if the validator is already known or if maybe its peer id chaned(should not happen) - let unknown = state.known_validators.insert(validator_id, peer_id.clone()).map(|o| o != peer_id).unwrap_or(true); + relay_parent: Hash, +) -> Result<()> { + let not_declared = state.declared_at.insert(peer_id.clone()); - if unknown { - // Only declare the new peers. - declare(ctx, state, vec![peer_id.clone()]).await?; - state.peer_views.insert(peer_id, Default::default()); + if not_declared { + declare(ctx, state, peer_id.clone()).await?; + } + + // Store the PeerId and find out if we should advertise to this peer. + // + // If this peer does not belong to the para validators, we also don't need to try to advertise our collation. + let advertise = if let Some(validators) = state.our_validators_groups.get_mut(&relay_parent) { + validators.add_peer_id_for_validator(&peer_id, &validator_id) + } else { + false + }; + + if advertise && state.peer_interested_in_leaf(&peer_id, &relay_parent) { + advertise_collation(ctx, state, relay_parent, peer_id).await?; } Ok(()) @@ -619,14 +638,11 @@ where /// Bridge messages switch. #[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))] -async fn handle_network_msg( - ctx: &mut Context, +async fn handle_network_msg( + ctx: &mut impl SubsystemContext, state: &mut State, bridge_message: NetworkBridgeEvent, -) -> Result<()> -where - Context: SubsystemContext -{ +) -> Result<()> { use NetworkBridgeEvent::*; match bridge_message { @@ -638,8 +654,8 @@ where handle_peer_view_change(ctx, state, peer_id, view).await?; } PeerDisconnected(peer_id) => { - state.known_validators.retain(|_, v| *v != peer_id); state.peer_views.remove(&peer_id); + state.declared_at.remove(&peer_id); } OurViewChange(view) => { handle_our_view_change(state, view).await?; @@ -658,59 +674,59 @@ async fn handle_our_view_change( state: &mut State, view: View, ) -> Result<()> { - let old_view = std::mem::replace(&mut (state.view), view); - - let view = state.view.clone(); - - let removed = old_view.difference(&view).collect::>(); - - for removed in removed.into_iter() { + for removed in state.view.difference(&view) { state.collations.remove(removed); state.our_validators_groups.remove(removed); + state.connection_requests.remove(removed); } + state.view = view; + Ok(()) } /// The collator protocol collator side main loop. #[tracing::instrument(skip(ctx, metrics), fields(subsystem = LOG_TARGET))] -pub(crate) async fn run( - mut ctx: Context, +pub(crate) async fn run( + mut ctx: impl SubsystemContext, our_id: CollatorId, metrics: Metrics, -) -> Result<()> -where - Context: SubsystemContext -{ +) -> Result<()> { use FromOverseer::*; use OverseerSignal::*; let mut state = State { metrics, + our_id, ..Default::default() }; - state.our_id = our_id; - loop { - if let Some(mut request) = state.last_connection_request.take() { - let _timer = state.metrics.time_handle_connection_request(); + select! { + res = state.connection_requests.next().fuse() => { + let (relay_parent, validator_id, peer_id) = match res { + Some(res) => res, + // Will never happen, but better to be safe. + None => continue, + }; - while let Poll::Ready(Some((validator_id, peer_id))) = futures::poll!(request.next()) { - if let Err(err) = handle_validator_connected(&mut ctx, &mut state, peer_id, validator_id).await { + let _timer = state.metrics.time_handle_connection_request(); + + if let Err(err) = handle_validator_connected( + &mut ctx, + &mut state, + peer_id, + validator_id, + relay_parent, + ).await { tracing::warn!( target: LOG_TARGET, err = ?err, "Failed to declare our collator id", ); } - } - // put it back - state.last_connection_request = Some(request); - } - - while let Poll::Ready(msg) = futures::poll!(ctx.recv()) { - match msg? { + }, + msg = ctx.recv().fuse() => match msg? { Communication { msg } => { if let Err(e) = process_msg(&mut ctx, &mut state, msg).await { tracing::warn!(target: LOG_TARGET, err = ?e, "Failed to process message"); @@ -721,8 +737,6 @@ where Signal(Conclude) => return Ok(()), } } - - futures::pending!() } } @@ -733,7 +747,7 @@ mod tests { use std::time::Duration; use assert_matches::assert_matches; - use futures::{executor, future, Future}; + use futures::{executor, future, Future, channel::mpsc}; use smallvec::smallvec; use sp_core::crypto::Pair; @@ -746,7 +760,6 @@ mod tests { use polkadot_subsystem::{ActiveLeavesUpdate, messages::{RuntimeApiMessage, RuntimeApiRequest}}; use polkadot_node_subsystem_util::TimeoutExt; use polkadot_subsystem_testhelpers as test_helpers; - use polkadot_node_network_protocol::ObservedRole; #[derive(Default)] struct TestCandidateBuilder { @@ -772,14 +785,14 @@ mod tests { #[derive(Clone)] struct TestState { - chain_ids: Vec, + para_id: ParaId, validators: Vec, validator_public: Vec, validator_authority_id: Vec, validator_peer_id: Vec, validator_groups: (Vec>, GroupRotationInfo), relay_parent: Hash, - availability_cores: Vec, + availability_core: CoreState, our_collator_pair: CollatorPair, } @@ -793,10 +806,7 @@ mod tests { impl Default for TestState { fn default() -> Self { - let chain_a = ParaId::from(1); - let chain_b = ParaId::from(2); - - let chain_ids = vec![chain_a, chain_b]; + let para_id = ParaId::from(1); let validators = vec![ Sr25519Keyring::Alice, @@ -813,7 +823,7 @@ mod tests { .take(validator_public.len()) .collect(); - let validator_groups = vec![vec![2, 0, 4], vec![1], vec![3]]; + let validator_groups = vec![vec![2, 0, 4], vec![3, 2, 4]]; let group_rotation_info = GroupRotationInfo { session_start_block: 0, group_rotation_frequency: 100, @@ -821,37 +831,107 @@ mod tests { }; let validator_groups = (validator_groups, group_rotation_info); - let availability_cores = vec![ - CoreState::Scheduled(ScheduledCore { - para_id: chain_ids[0], - collator: None, - }), - CoreState::Scheduled(ScheduledCore { - para_id: chain_ids[1], - collator: None, - }), - ]; + let availability_core = CoreState::Scheduled(ScheduledCore { + para_id, + collator: None, + }); - let relay_parent = Hash::repeat_byte(0x05); + let relay_parent = Hash::random(); let our_collator_pair = CollatorPair::generate().0; Self { - chain_ids, + para_id, validators, validator_public, validator_authority_id, validator_peer_id, validator_groups, relay_parent, - availability_cores, + availability_core, our_collator_pair, } } } + impl TestState { + fn current_group_validator_indices(&self) -> &[ValidatorIndex] { + &self.validator_groups.0[0] + } + + fn current_group_validator_peer_ids(&self) -> Vec { + self.current_group_validator_indices().iter().map(|i| self.validator_peer_id[*i as usize].clone()).collect() + } + + fn current_group_validator_authority_ids(&self) -> Vec { + self.current_group_validator_indices() + .iter() + .map(|i| self.validator_authority_id[*i as usize].clone()) + .collect() + } + + fn current_group_validator_ids(&self) -> Vec { + self.current_group_validator_indices() + .iter() + .map(|i| self.validator_public[*i as usize].clone()) + .collect() + } + + fn next_group_validator_indices(&self) -> &[ValidatorIndex] { + &self.validator_groups.0[1] + } + + fn next_group_validator_authority_ids(&self) -> Vec { + self.next_group_validator_indices() + .iter() + .map(|i| self.validator_authority_id[*i as usize].clone()) + .collect() + } + + fn next_group_validator_ids(&self) -> Vec { + self.next_group_validator_indices() + .iter() + .map(|i| self.validator_public[*i as usize].clone()) + .collect() + } + + /// Returns the unique count of validators in the current and next group. + fn current_and_next_group_unique_validator_count(&self) -> usize { + let mut indices = self.next_group_validator_indices().iter().collect::>(); + indices.extend(self.current_group_validator_indices()); + indices.len() + } + + /// Generate a new relay parent and inform the subsystem about the new view. + /// + /// If `merge_views == true` it means the subsystem will be informed that we working on the old `relay_parent` + /// and the new one. + async fn advance_to_new_round(&mut self, virtual_overseer: &mut VirtualOverseer, merge_views: bool) { + let old_relay_parent = self.relay_parent; + + while self.relay_parent == old_relay_parent { + self.relay_parent.randomize(); + } + + let hashes = if merge_views { + vec![old_relay_parent, self.relay_parent] + } else { + vec![self.relay_parent] + }; + + overseer_send( + virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::OurViewChange(View(hashes)), + ), + ).await; + } + } + + type VirtualOverseer = test_helpers::TestSubsystemContextHandle; + struct TestHarness { - virtual_overseer: test_helpers::TestSubsystemContextHandle, + virtual_overseer: VirtualOverseer, } fn test_harness>( @@ -932,190 +1012,256 @@ mod tests { .expect(&format!("{:?} is more than enough for sending signals.", TIMEOUT)); } + // Setup the system by sending the `CollateOn`, `ActiveLeaves` and `OurViewChange` messages. + async fn setup_system(virtual_overseer: &mut VirtualOverseer, test_state: &TestState) { + overseer_send( + virtual_overseer, + CollatorProtocolMessage::CollateOn(test_state.para_id), + ).await; + + overseer_signal( + virtual_overseer, + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { + activated: smallvec![test_state.relay_parent], + deactivated: smallvec![], + }), + ).await; + + overseer_send( + virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::OurViewChange(View(vec![test_state.relay_parent])), + ), + ).await; + } + + /// Result of [`distribute_collation`] + struct DistributeCollation { + /// Should be used to inform the subsystem about connected validators. + connected: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>, + candidate: CandidateReceipt, + pov_block: PoV, + } + + /// Create some PoV and distribute it. + async fn distribute_collation( + virtual_overseer: &mut VirtualOverseer, + test_state: &TestState, + ) -> DistributeCollation { + // Now we want to distribute a PoVBlock + let pov_block = PoV { + block_data: BlockData(vec![42, 43, 44]), + }; + + let pov_hash = pov_block.hash(); + + let candidate = TestCandidateBuilder { + para_id: test_state.para_id, + relay_parent: test_state.relay_parent, + pov_hash, + ..Default::default() + }.build(); + + overseer_send( + virtual_overseer, + CollatorProtocolMessage::DistributeCollation(candidate.clone(), pov_block.clone()), + ).await; + + // obtain the availability cores. + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::AvailabilityCores(tx) + )) => { + assert_eq!(relay_parent, test_state.relay_parent); + tx.send(Ok(vec![test_state.availability_core.clone()])).unwrap(); + } + ); + + // Obtain the validator groups + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::ValidatorGroups(tx) + )) => { + assert_eq!(relay_parent, test_state.relay_parent); + tx.send(Ok(test_state.validator_groups.clone())).unwrap(); + } + ); + + // obtain the validators per relay parent + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Validators(tx), + )) => { + assert_eq!(relay_parent, test_state.relay_parent); + tx.send(Ok(test_state.validator_public.clone())).unwrap(); + } + ); + + // obtain the validator_id to authority_id mapping + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::ValidatorDiscovery(validators, tx), + )) => { + assert_eq!(relay_parent, test_state.relay_parent); + assert_eq!(validators.len(), test_state.current_and_next_group_unique_validator_count()); + + let current_validators = test_state.current_group_validator_ids(); + let next_validators = test_state.next_group_validator_ids(); + + assert!(validators.iter().all(|v| current_validators.contains(&v) || next_validators.contains(&v))); + + let current_validators = test_state.current_group_validator_authority_ids(); + let next_validators = test_state.next_group_validator_authority_ids(); + + tx.send(Ok(current_validators.into_iter().chain(next_validators).map(Some).collect())).unwrap(); + } + ); + + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::ConnectToValidators { + connected, + .. + } + ) => { + DistributeCollation { + connected, + candidate, + pov_block, + } + } + ) + } + + /// Connect a peer + async fn connect_peer(virtual_overseer: &mut VirtualOverseer, peer: PeerId) { + overseer_send( + virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerConnected( + peer.clone(), + polkadot_node_network_protocol::ObservedRole::Authority, + ), + ), + ).await; + + overseer_send( + virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerViewChange(peer, View(Default::default())), + ), + ).await; + } + + /// Disconnect a peer + async fn disconnect_peer(virtual_overseer: &mut VirtualOverseer, peer: PeerId) { + overseer_send( + virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::PeerDisconnected(peer)), + ).await; + } + + /// Check that the next received message is a `Declare` message. + async fn expect_declare_msg(virtual_overseer: &mut VirtualOverseer, test_state: &TestState, peer: &PeerId) { + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::SendCollationMessage( + to, + protocol_v1::CollationProtocol::CollatorProtocol(wire_message), + ) + ) => { + assert_eq!(to[0], *peer); + assert_matches!( + wire_message, + protocol_v1::CollatorProtocolMessage::Declare(collator_id) => { + assert_eq!(collator_id, test_state.our_collator_pair.public()); + } + ); + } + ); + } + + /// Check that the next received message is a collation advertisment message. + async fn expect_advertise_collation_msg( + virtual_overseer: &mut VirtualOverseer, + test_state: &TestState, + peer: &PeerId, + expected_relay_parent: Hash, + ) { + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::SendCollationMessage( + to, + protocol_v1::CollationProtocol::CollatorProtocol(wire_message), + ) + ) => { + assert_eq!(to[0], *peer); + assert_matches!( + wire_message, + protocol_v1::CollatorProtocolMessage::AdvertiseCollation( + relay_parent, + collating_on, + ) => { + assert_eq!(relay_parent, expected_relay_parent); + assert_eq!(collating_on, test_state.para_id); + } + ); + } + ); + } + + /// Send a message that the given peer's view changed. + async fn send_peer_view_change(virtual_overseer: &mut VirtualOverseer, peer: &PeerId, hashes: Vec) { + overseer_send( + virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerViewChange(peer.clone(), View(hashes)), + ), + ).await; + } + #[test] fn advertise_and_send_collation() { - let test_state = TestState::default(); + let mut test_state = TestState::default(); test_harness(test_state.our_collator_pair.public(), |test_harness| async move { - let current = test_state.relay_parent; let mut virtual_overseer = test_harness.virtual_overseer; - let pov_block = PoV { - block_data: BlockData(vec![42, 43, 44]), - }; + setup_system(&mut virtual_overseer, &test_state).await; - let pov_hash = pov_block.hash(); - - let candidate = TestCandidateBuilder { - para_id: test_state.chain_ids[0], - relay_parent: test_state.relay_parent, - pov_hash, - ..Default::default() - }.build(); - - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::CollateOn(test_state.chain_ids[0]) - ).await; - - overseer_signal( - &mut virtual_overseer, - OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { - activated: smallvec![current.clone()], - deactivated: smallvec![], - }), - ).await; - - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::OurViewChange(View(vec![current])), - ), - ).await; - - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::DistributeCollation(candidate.clone(), pov_block.clone()), - ).await; - - // obtain the availability cores. - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::AvailabilityCores(tx) - )) => { - assert_eq!(relay_parent, current); - tx.send(Ok(test_state.availability_cores.clone())).unwrap(); - } - ); - - // Obtain the validator groups - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::ValidatorGroups(tx) - )) => { - assert_eq!(relay_parent, current); - tx.send(Ok(test_state.validator_groups.clone())).unwrap(); - } - ); - - // obtain the validators per relay parent - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::Validators(tx), - )) => { - assert_eq!(relay_parent, current); - tx.send(Ok(test_state.validator_public.clone())).unwrap(); - } - ); - - // obtain the validator_id to authority_id mapping - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::ValidatorDiscovery(validators, tx), - )) => { - assert_eq!(relay_parent, current); - assert_eq!(validators.len(), 4); - assert!(validators.iter().all(|v| test_state.validator_public.contains(&v))); - - let result = vec![ - Some(test_state.validator_authority_id[2].clone()), - Some(test_state.validator_authority_id[0].clone()), - Some(test_state.validator_authority_id[4].clone()), - Some(test_state.validator_authority_id[1].clone()), - ]; - tx.send(Ok(result)).unwrap(); - } - ); - - // We now should connect to our validator group. - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ConnectToValidators { - validator_ids, - mut connected, - .. - } - ) => { - assert_eq!(validator_ids.len(), 4); - assert!(validator_ids.iter().all(|id| test_state.validator_authority_id.contains(id))); - - let result = vec![ - (test_state.validator_authority_id[2].clone(), test_state.validator_peer_id[2].clone()), - (test_state.validator_authority_id[0].clone(), test_state.validator_peer_id[0].clone()), - (test_state.validator_authority_id[4].clone(), test_state.validator_peer_id[4].clone()), - (test_state.validator_authority_id[1].clone(), test_state.validator_peer_id[1].clone()), - ]; - - result.into_iter().for_each(|r| connected.try_send(r).unwrap()); - } - ); + let DistributeCollation { mut connected, candidate, pov_block } = + distribute_collation(&mut virtual_overseer, &test_state).await; + test_state.current_group_validator_authority_ids() + .into_iter() + .zip(test_state.current_group_validator_peer_ids()) + .for_each(|r| connected.try_send(r).unwrap()); // We declare to the connected validators that we are a collator. // We need to catch all `Declare` messages to the validators we've // previosly connected to. - for i in vec![2, 0, 4, 1].into_iter() { - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::SendCollationMessage( - to, - protocol_v1::CollationProtocol::CollatorProtocol(wire_message), - ) - ) => { - assert_eq!(to, vec![test_state.validator_peer_id[i].clone()]); - assert_matches!( - wire_message, - protocol_v1::CollatorProtocolMessage::Declare(collator_id) => { - assert_eq!(collator_id, test_state.our_collator_pair.public()); - } - ); - } - ); + for peer_id in test_state.current_group_validator_peer_ids() { + expect_declare_msg(&mut virtual_overseer, &test_state, &peer_id).await; } + let peer = test_state.current_group_validator_peer_ids()[0].clone(); + // Send info about peer's view. - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerViewChange( - test_state.validator_peer_id[2].clone(), - View(vec![current]), - ) - ) - ).await; + send_peer_view_change(&mut virtual_overseer, &peer, vec![test_state.relay_parent]).await; // The peer is interested in a leaf that we have a collation for; // advertise it. - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::SendCollationMessage( - to, - protocol_v1::CollationProtocol::CollatorProtocol(wire_message), - ) - ) => { - assert_eq!(to, vec![test_state.validator_peer_id[2].clone()]); - assert_matches!( - wire_message, - protocol_v1::CollatorProtocolMessage::AdvertiseCollation( - relay_parent, - collating_on, - ) => { - assert_eq!(relay_parent, current); - assert_eq!(collating_on, test_state.chain_ids[0]); - } - ); - } - ); + expect_advertise_collation_msg(&mut virtual_overseer, &test_state, &peer, test_state.relay_parent).await; let request_id = 42; @@ -1124,11 +1270,11 @@ mod tests { &mut virtual_overseer, CollatorProtocolMessage::NetworkBridgeUpdateV1( NetworkBridgeEvent::PeerMessage( - test_state.validator_peer_id[2].clone(), + peer.clone(), protocol_v1::CollatorProtocolMessage::RequestCollation( request_id, - current, - test_state.chain_ids[0], + test_state.relay_parent, + test_state.para_id, ) ) ) @@ -1143,7 +1289,7 @@ mod tests { protocol_v1::CollationProtocol::CollatorProtocol(wire_message), ) ) => { - assert_eq!(to, vec![test_state.validator_peer_id[2].clone()]); + assert_eq!(to, vec![peer]); assert_matches!( wire_message, protocol_v1::CollatorProtocolMessage::Collation(req_id, receipt, pov) => { @@ -1155,28 +1301,21 @@ mod tests { } ); - let new_head = Hash::repeat_byte(0xA); + let old_relay_parent = test_state.relay_parent; + test_state.advance_to_new_round(&mut virtual_overseer, false).await; - // Collator's view moves on. - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::OurViewChange(View(vec![new_head])), - ), - ).await; - - let request_id = 43; + let peer = test_state.validator_peer_id[2].clone(); // Re-request a collation. overseer_send( &mut virtual_overseer, CollatorProtocolMessage::NetworkBridgeUpdateV1( NetworkBridgeEvent::PeerMessage( - test_state.validator_peer_id[2].clone(), + peer.clone(), protocol_v1::CollatorProtocolMessage::RequestCollation( - request_id, - current, - test_state.chain_ids[0], + 43, + old_relay_parent, + test_state.para_id, ) ) ) @@ -1184,123 +1323,25 @@ mod tests { assert!(overseer_recv_with_timeout(&mut virtual_overseer, TIMEOUT).await.is_none()); - let pov_block = PoV { - block_data: BlockData(vec![45, 46, 47]), - }; - - let pov_hash = pov_block.hash(); - let current = Hash::repeat_byte(33); - - let candidate = TestCandidateBuilder { - para_id: test_state.chain_ids[0], - relay_parent: current, - pov_hash, - ..Default::default() - }.build(); - - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::OurViewChange(View(vec![current])), - ), - ).await; + let DistributeCollation { mut connected, .. } = + distribute_collation(&mut virtual_overseer, &test_state).await; + test_state.current_group_validator_authority_ids() + .into_iter() + .zip(test_state.current_group_validator_peer_ids()) + .for_each(|r| connected.try_send(r).unwrap()); // Send info about peer's view. overseer_send( &mut virtual_overseer, CollatorProtocolMessage::NetworkBridgeUpdateV1( NetworkBridgeEvent::PeerViewChange( - test_state.validator_peer_id[2].clone(), - View(vec![current]), + peer.clone(), + View(vec![test_state.relay_parent]), ) ) ).await; - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::DistributeCollation(candidate.clone(), pov_block.clone()), - ).await; - - // obtain the availability cores. - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::AvailabilityCores(tx) - )) => { - assert_eq!(relay_parent, current); - tx.send(Ok(test_state.availability_cores.clone())).unwrap(); - } - ); - - // Obtain the validator groups - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::ValidatorGroups(tx) - )) => { - assert_eq!(relay_parent, current); - tx.send(Ok(test_state.validator_groups.clone())).unwrap(); - } - ); - - // obtain the validators per relay parent - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::Validators(tx), - )) => { - assert_eq!(relay_parent, current); - tx.send(Ok(test_state.validator_public.clone())).unwrap(); - } - ); - - // The peer is interested in a leaf that we have a collation for; - // advertise it. - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::SendCollationMessage( - to, - protocol_v1::CollationProtocol::CollatorProtocol(wire_message), - ) - ) => { - assert_eq!(to, vec![test_state.validator_peer_id[2].clone()]); - assert_matches!( - wire_message, - protocol_v1::CollatorProtocolMessage::AdvertiseCollation( - relay_parent, - collating_on, - ) => { - assert_eq!(relay_parent, current); - assert_eq!(collating_on, test_state.chain_ids[0]); - } - ); - } - ); - - // obtain the validator_id to authority_id mapping - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::ValidatorDiscovery(validators, tx), - )) => { - assert_eq!(relay_parent, current); - assert_eq!(validators.len(), 4); - assert!(validators.iter().all(|p| test_state.validator_public.contains(p))); - - let result = vec![ - Some(test_state.validator_authority_id[2].clone()), - Some(test_state.validator_authority_id[0].clone()), - Some(test_state.validator_authority_id[4].clone()), - Some(test_state.validator_authority_id[1].clone()), - ]; - tx.send(Ok(result)).unwrap(); - } - ); + expect_advertise_collation_msg(&mut virtual_overseer, &test_state, &peer, test_state.relay_parent).await; }); } @@ -1316,148 +1357,132 @@ mod tests { let peer = test_state.validator_peer_id[0].clone(); let validator_id = test_state.validator_authority_id[0].clone(); - // Setup the system correctly - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::CollateOn(test_state.chain_ids[0]), - ).await; - - overseer_signal( - &mut virtual_overseer, - OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { - activated: smallvec![test_state.relay_parent], - deactivated: smallvec![], - }), - ).await; - - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::OurViewChange(View(vec![test_state.relay_parent])), - ), - ).await; + setup_system(&mut virtual_overseer, &test_state).await; // A validator connected to us - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Authority), - ), - ).await; + connect_peer(&mut virtual_overseer, peer.clone()).await; - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerViewChange(peer.clone(), View(Default::default())), - ), - ).await; + let mut connected = distribute_collation(&mut virtual_overseer, &test_state).await.connected; + connected.try_send((validator_id, peer.clone())).unwrap(); - // Now we want to distribute a PoVBlock - let pov_block = PoV { - block_data: BlockData(vec![42, 43, 44]), - }; + expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await; + }) + } - let pov_hash = pov_block.hash(); + #[test] + fn collations_are_only_advertised_to_validators_with_correct_view() { + let test_state = TestState::default(); - let candidate = TestCandidateBuilder { - para_id: test_state.chain_ids[0], - relay_parent: test_state.relay_parent, - pov_hash, - ..Default::default() - }.build(); + test_harness(test_state.our_collator_pair.public(), |test_harness| async move { + let mut virtual_overseer = test_harness.virtual_overseer; - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::DistributeCollation(candidate.clone(), pov_block.clone()), - ).await; + let peer = test_state.current_group_validator_peer_ids()[0].clone(); + let validator_id = test_state.current_group_validator_authority_ids()[0].clone(); - // obtain the availability cores. - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::AvailabilityCores(tx) - )) => { - assert_eq!(relay_parent, test_state.relay_parent); - tx.send(Ok(test_state.availability_cores.clone())).unwrap(); - } - ); + let peer2 = test_state.current_group_validator_peer_ids()[1].clone(); + let validator_id2 = test_state.current_group_validator_authority_ids()[1].clone(); - // Obtain the validator groups - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::ValidatorGroups(tx) - )) => { - assert_eq!(relay_parent, test_state.relay_parent); - tx.send(Ok(test_state.validator_groups.clone())).unwrap(); - } - ); + setup_system(&mut virtual_overseer, &test_state).await; - // obtain the validators per relay parent - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::Validators(tx), - )) => { - assert_eq!(relay_parent, test_state.relay_parent); - tx.send(Ok(test_state.validator_public.clone())).unwrap(); - } - ); + // A validator connected to us + connect_peer(&mut virtual_overseer, peer.clone()).await; - // obtain the validator_id to authority_id mapping - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::ValidatorDiscovery(validators, tx), - )) => { - assert_eq!(relay_parent, test_state.relay_parent); - assert_eq!(validators.len(), 4); - assert!(validators.iter().all(|v| test_state.validator_public.contains(&v))); + // Connect the second validator + connect_peer(&mut virtual_overseer, peer2.clone()).await; - let result = vec![ - Some(test_state.validator_authority_id[2].clone()), - Some(test_state.validator_authority_id[0].clone()), - Some(test_state.validator_authority_id[4].clone()), - Some(test_state.validator_authority_id[1].clone()), - ]; - tx.send(Ok(result)).unwrap(); - } - ); + // And let it tell us that it is has the same view. + send_peer_view_change(&mut virtual_overseer, &peer2, vec![test_state.relay_parent]).await; - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ConnectToValidators { - mut connected, - .. - } - ) => { - connected.try_send((validator_id, peer.clone())).unwrap(); - } - ); + let mut connected = distribute_collation(&mut virtual_overseer, &test_state).await.connected; + connected.try_send((validator_id, peer.clone())).unwrap(); + connected.try_send((validator_id2, peer2.clone())).unwrap(); - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::SendCollationMessage( - peer_id, - msg, - ) - ) => { - assert_matches!( - msg, - protocol_v1::CollationProtocol::CollatorProtocol( - protocol_v1::CollatorProtocolMessage::Declare(collator_id), - ) if collator_id == test_state.our_collator_pair.public() - ); + expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await; + expect_declare_msg(&mut virtual_overseer, &test_state, &peer2).await; - assert_eq!(peer, peer_id[0]); - } - ); + expect_advertise_collation_msg(&mut virtual_overseer, &test_state, &peer2, test_state.relay_parent).await; + + // The other validator announces that it changed its view. + send_peer_view_change(&mut virtual_overseer, &peer, vec![test_state.relay_parent]).await; + + // After changing the view we should receive the advertisement + expect_advertise_collation_msg(&mut virtual_overseer, &test_state, &peer, test_state.relay_parent).await; + }) + } + + #[test] + fn collate_on_two_different_relay_chain_blocks() { + let mut test_state = TestState::default(); + + test_harness(test_state.our_collator_pair.public(), |test_harness| async move { + let mut virtual_overseer = test_harness.virtual_overseer; + + let peer = test_state.current_group_validator_peer_ids()[0].clone(); + let validator_id = test_state.current_group_validator_authority_ids()[0].clone(); + + let peer2 = test_state.current_group_validator_peer_ids()[1].clone(); + let validator_id2 = test_state.current_group_validator_authority_ids()[1].clone(); + + setup_system(&mut virtual_overseer, &test_state).await; + + // A validator connected to us + connect_peer(&mut virtual_overseer, peer.clone()).await; + + // Connect the second validator + connect_peer(&mut virtual_overseer, peer2.clone()).await; + + let mut connected = distribute_collation(&mut virtual_overseer, &test_state).await.connected; + connected.try_send((validator_id.clone(), peer.clone())).unwrap(); + connected.try_send((validator_id2.clone(), peer2.clone())).unwrap(); + + expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await; + expect_declare_msg(&mut virtual_overseer, &test_state, &peer2).await; + + let old_relay_parent = test_state.relay_parent; + + // Advance to a new round, while informing the subsystem that the old and the new relay parent are active. + test_state.advance_to_new_round(&mut virtual_overseer, true).await; + + let mut connected = distribute_collation(&mut virtual_overseer, &test_state).await.connected; + connected.try_send((validator_id, peer.clone())).unwrap(); + connected.try_send((validator_id2, peer2.clone())).unwrap(); + + send_peer_view_change(&mut virtual_overseer, &peer, vec![old_relay_parent]).await; + expect_advertise_collation_msg(&mut virtual_overseer, &test_state, &peer, old_relay_parent).await; + + send_peer_view_change(&mut virtual_overseer, &peer2, vec![test_state.relay_parent]).await; + expect_advertise_collation_msg(&mut virtual_overseer, &test_state, &peer2, test_state.relay_parent).await; + }) + } + + #[test] + fn validator_reconnect_does_not_advertise_a_second_time() { + let test_state = TestState::default(); + + test_harness(test_state.our_collator_pair.public(), |test_harness| async move { + let mut virtual_overseer = test_harness.virtual_overseer; + + let peer = test_state.current_group_validator_peer_ids()[0].clone(); + let validator_id = test_state.current_group_validator_authority_ids()[0].clone(); + + setup_system(&mut virtual_overseer, &test_state).await; + + // A validator connected to us + connect_peer(&mut virtual_overseer, peer.clone()).await; + + let mut connected = distribute_collation(&mut virtual_overseer, &test_state).await.connected; + connected.try_send((validator_id.clone(), peer.clone())).unwrap(); + + expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await; + send_peer_view_change(&mut virtual_overseer, &peer, vec![test_state.relay_parent]).await; + expect_advertise_collation_msg(&mut virtual_overseer, &test_state, &peer, test_state.relay_parent).await; + + // Disconnect and reconnect directly + disconnect_peer(&mut virtual_overseer, peer.clone()).await; + connect_peer(&mut virtual_overseer, peer.clone()).await; + send_peer_view_change(&mut virtual_overseer, &peer, vec![test_state.relay_parent]).await; + + assert!(overseer_recv_with_timeout(&mut virtual_overseer, TIMEOUT).await.is_none()); }) } } diff --git a/polkadot/node/network/collator-protocol/src/lib.rs b/polkadot/node/network/collator-protocol/src/lib.rs index 5909c0b8e6..73ed2c956b 100644 --- a/polkadot/node/network/collator-protocol/src/lib.rs +++ b/polkadot/node/network/collator-protocol/src/lib.rs @@ -18,6 +18,7 @@ //! This subsystem implements both sides of the collator protocol. #![deny(missing_docs, unused_crate_dependencies)] +#![recursion_limit="256"] use std::time::Duration; use futures::{channel::oneshot, FutureExt, TryFutureExt};