From 994d621f2cf40b4dda6d1176bfd0df1fd88536df Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Wed, 25 Nov 2020 12:20:54 +0300 Subject: [PATCH] PoV Distribution optimization (#1990) * Initial commit * Remove unnecessary struct * Some review nits * Update node/network/pov-distribution/src/lib.rs * Update parachain/test-parachains/adder/collator/tests/integration.rs * Review nits * notify_all_we_are_awaiting * Both ways of peers connections should work the same * Add mod-level docs to error.rs * Avoid multiple connection requests at same parent * Dont bail on errors * FusedStream for ConnectionRequests * Fix build after merge * Improve error handling * Remove whitespace formatting --- polkadot/Cargo.lock | 5 + .../node/network/pov-distribution/Cargo.toml | 8 + .../network/pov-distribution/src/error.rs | 35 + .../node/network/pov-distribution/src/lib.rs | 277 ++++++-- .../network/pov-distribution/src/tests.rs | 644 +++++++++++++++++- .../subsystem-util/src/validator_discovery.rs | 11 + 6 files changed, 900 insertions(+), 80 deletions(-) create mode 100644 polkadot/node/network/pov-distribution/src/error.rs diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index a78cf0038d..97f4c52ab9 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -5202,13 +5202,18 @@ name = "polkadot-pov-distribution" version = "0.1.0" dependencies = [ "assert_matches", + "env_logger 0.8.2", "futures 0.3.8", + "log", "polkadot-node-network-protocol", "polkadot-node-subsystem", "polkadot-node-subsystem-test-helpers", "polkadot-node-subsystem-util", "polkadot-primitives", + "smallvec 1.5.0", "sp-core", + "sp-keyring", + "thiserror", "tracing", "tracing-futures", ] diff --git a/polkadot/node/network/pov-distribution/Cargo.toml b/polkadot/node/network/pov-distribution/Cargo.toml index 6ef2d10a56..6f2bc640b0 100644 --- a/polkadot/node/network/pov-distribution/Cargo.toml +++ b/polkadot/node/network/pov-distribution/Cargo.toml @@ -6,8 +6,10 @@ edition = "2018" [dependencies] futures = "0.3.8" +thiserror = "1.0.21" tracing = "0.1.22" tracing-futures = "0.2.4" + polkadot-primitives = { path = "../../../primitives" } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } polkadot-node-subsystem-util = { path = "../../subsystem-util" } @@ -15,5 +17,11 @@ polkadot-node-network-protocol = { path = "../../network/protocol" } [dev-dependencies] assert_matches = "1.4.0" +env_logger = "0.8.1" +log = "0.4.11" +smallvec = "1.4.2" + sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" } + polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" } diff --git a/polkadot/node/network/pov-distribution/src/error.rs b/polkadot/node/network/pov-distribution/src/error.rs new file mode 100644 index 0000000000..625ea4c7fd --- /dev/null +++ b/polkadot/node/network/pov-distribution/src/error.rs @@ -0,0 +1,35 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! The `Error` and `Result` types used by the subsystem. + +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum Error { + #[error(transparent)] + Subsystem(#[from] polkadot_subsystem::SubsystemError), + #[error(transparent)] + OneshotRecv(#[from] futures::channel::oneshot::Canceled), + #[error(transparent)] + Runtime(#[from] polkadot_subsystem::errors::RuntimeApiError), + #[error(transparent)] + ValidatorDiscovery(#[from] polkadot_node_subsystem_util::validator_discovery::Error), + #[error(transparent)] + Util(#[from] polkadot_node_subsystem_util::Error), +} + +pub type Result = std::result::Result; diff --git a/polkadot/node/network/pov-distribution/src/lib.rs b/polkadot/node/network/pov-distribution/src/lib.rs index 01b86b44a0..1b59441fc0 100644 --- a/polkadot/node/network/pov-distribution/src/lib.rs +++ b/polkadot/node/network/pov-distribution/src/lib.rs @@ -22,16 +22,26 @@ #![deny(unused_crate_dependencies)] #![warn(missing_docs)] -use polkadot_primitives::v1::{Hash, PoV, CandidateDescriptor}; +use polkadot_primitives::v1::{ + Hash, PoV, CandidateDescriptor, ValidatorId, Id as ParaId, CoreIndex, CoreState, +}; use polkadot_subsystem::{ - ActiveLeavesUpdate, OverseerSignal, SubsystemContext, Subsystem, SubsystemResult, SubsystemError, + ActiveLeavesUpdate, OverseerSignal, SubsystemContext, SubsystemResult, SubsystemError, Subsystem, FromOverseer, SpawnedSubsystem, messages::{ - PoVDistributionMessage, RuntimeApiMessage, RuntimeApiRequest, AllMessages, NetworkBridgeMessage, + PoVDistributionMessage, AllMessages, NetworkBridgeMessage, }, }; -use polkadot_node_subsystem_util::metrics::{self, prometheus}; -use polkadot_node_network_protocol::{v1 as protocol_v1, ReputationChange as Rep, NetworkBridgeEvent, PeerId, View}; +use polkadot_node_subsystem_util::{ + validator_discovery, + request_validators_ctx, + request_validator_groups_ctx, + request_availability_cores_ctx, + metrics::{self, prometheus}, +}; +use polkadot_node_network_protocol::{ + v1 as protocol_v1, ReputationChange as Rep, NetworkBridgeEvent, PeerId, View, +}; use futures::prelude::*; use futures::channel::oneshot; @@ -39,6 +49,8 @@ use futures::channel::oneshot; use std::collections::{hash_map::{Entry, HashMap}, HashSet}; use std::sync::Arc; +mod error; + #[cfg(test)] mod tests; @@ -75,20 +87,33 @@ impl Subsystem for PoVDistribution } } +#[derive(Default)] struct State { + /// A state of things going on on a per-relay-parent basis. relay_parent_state: HashMap, + + /// Info on peers. peer_state: HashMap, + + /// Our own view. our_view: View, + + /// Connect to relevant groups of validators at different relay parents. + connection_requests: validator_discovery::ConnectionRequests, + + /// Metrics. metrics: Metrics, } struct BlockBasedState { known: HashMap>, + /// All the PoVs we are or were fetching, coupled with channels expecting the data. /// /// This may be an empty list, which indicates that we were once awaiting this PoV but have /// received it already. fetching: HashMap>>>, + n_validators: usize, } @@ -128,38 +153,45 @@ async fn handle_signal( let _timer = state.metrics.time_handle_signal(); for relay_parent in activated { - let (vals_tx, vals_rx) = oneshot::channel(); - ctx.send_message(AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::Validators(vals_tx), - ))).await; + match request_validators_ctx(relay_parent.clone(), ctx).await { + Ok(vals_rx) => { + let n_validators = match vals_rx.await? { + Ok(v) => v.len(), + Err(e) => { + tracing::warn!( + target: LOG_TARGET, + err = ?e, + "Error fetching validators from runtime API for active leaf", + ); - let n_validators = match vals_rx.await? { - Ok(v) => v.len(), + // Not adding bookkeeping here might make us behave funny, but we + // shouldn't take down the node on spurious runtime API errors. + // + // and this is "behave funny" as in be bad at our job, but not in any + // slashable or security-related way. + continue; + } + }; + + state.relay_parent_state.insert(relay_parent, BlockBasedState { + known: HashMap::new(), + fetching: HashMap::new(), + n_validators, + }); + } Err(e) => { + // continue here also as above. tracing::warn!( target: LOG_TARGET, err = ?e, "Error fetching validators from runtime API for active leaf", ); - - // Not adding bookkeeping here might make us behave funny, but we - // shouldn't take down the node on spurious runtime API errors. - // - // and this is "behave funny" as in be bad at our job, but not in any - // slashable or security-related way. - continue; } - }; - - state.relay_parent_state.insert(relay_parent, BlockBasedState { - known: HashMap::new(), - fetching: HashMap::new(), - n_validators: n_validators, - }); + } } for relay_parent in deactivated { + state.connection_requests.remove(&relay_parent); state.relay_parent_state.remove(&relay_parent); } @@ -197,7 +229,7 @@ async fn notify_all_we_are_awaiting( ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( peers_to_send, payload, - ))).await + ))).await; } /// Notify one peer about everything we're awaiting at a given relay-parent. @@ -224,7 +256,7 @@ async fn notify_one_we_are_awaiting_many( ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( vec![peer.clone()], payload, - ))).await + ))).await; } /// Distribute a PoV to peers who are awaiting it. @@ -262,6 +294,75 @@ async fn distribute_to_awaiting( metrics.on_pov_distributed(); } +/// Get the Id of the Core that is assigned to the para being collated on if any +/// and the total number of cores. +async fn determine_core( + ctx: &mut impl SubsystemContext, + para_id: ParaId, + relay_parent: Hash, +) -> error::Result> { + let cores = request_availability_cores_ctx(relay_parent, ctx).await?.await??; + + for (idx, core) in cores.iter().enumerate() { + if let CoreState::Scheduled(occupied) = core { + if occupied.para_id == para_id { + return Ok(Some(((idx as u32).into(), cores.len()))); + } + } + } + + Ok(None) +} + +/// Figure out a group of validators assigned to a given `ParaId`. +async fn determine_validators_for_core( + ctx: &mut impl SubsystemContext, + core_index: CoreIndex, + num_cores: usize, + relay_parent: Hash, +) -> error::Result>> { + let groups = request_validator_groups_ctx(relay_parent, ctx).await?.await??; + + let group_index = groups.1.group_for_core(core_index, num_cores); + + let connect_to_validators = match groups.0.get(group_index.0 as usize) { + Some(group) => group.clone(), + None => return Ok(None), + }; + + let validators = request_validators_ctx(relay_parent, ctx).await?.await??; + + let validators = connect_to_validators + .into_iter() + .map(|idx| validators[idx as usize].clone()) + .collect(); + + Ok(Some(validators)) +} + +async fn determine_relevant_validators( + ctx: &mut impl SubsystemContext, + relay_parent: Hash, + para_id: ParaId, +) -> error::Result>> { + // Determine which core the para_id is assigned to. + let (core, num_cores) = match determine_core(ctx, para_id, relay_parent).await? { + Some(core) => core, + None => { + tracing::warn!( + target: LOG_TARGET, + "Looks like no core is assigned to {:?} at {:?}", + para_id, + relay_parent, + ); + + return Ok(None); + } + }; + + determine_validators_for_core(ctx, core, num_cores, relay_parent).await +} + /// Handles a `FetchPoV` message. #[tracing::instrument(level = "trace", skip(ctx, state, response_sender), fields(subsystem = LOG_TARGET))] async fn handle_fetch( @@ -291,7 +392,35 @@ async fn handle_fetch( return; } Entry::Vacant(e) => { - e.insert(vec![response_sender]); + if let Ok(Some(relevant_validators)) = determine_relevant_validators( + ctx, + relay_parent, + descriptor.para_id, + ).await { + // We only need one connection request per (relay_parent, para_id) + // so here we take this shortcut to avoid calling `connect_to_validators` + // more than once. + if !state.connection_requests.contains_request(&relay_parent) { + match validator_discovery::connect_to_validators( + ctx, + relay_parent, + relevant_validators.clone(), + ).await { + Ok(new_connection_request) => { + state.connection_requests.put(relay_parent, new_connection_request); + } + Err(e) => { + tracing::debug!( + target: LOG_TARGET, + "Failed to create a validator connection request {:?}", + e, + ); + } + } + } + + e.insert(vec![response_sender]); + } } } } @@ -482,6 +611,11 @@ async fn handle_incoming_pov( ).await } +/// Handles a newly connected validator in the context of some relay leaf. +fn handle_validator_connected(state: &mut State, peer_id: PeerId) { + state.peer_state.entry(peer_id).or_default(); +} + /// Handles a network bridge update. #[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))] async fn handle_network_update( @@ -493,7 +627,7 @@ async fn handle_network_update( match update { NetworkBridgeEvent::PeerConnected(peer, _observed_role) => { - state.peer_state.insert(peer, PeerState { awaited: HashMap::new() }); + handle_validator_connected(state, peer); } NetworkBridgeEvent::PeerDisconnected(peer) => { state.peer_state.remove(&peer); @@ -558,44 +692,61 @@ impl PoVDistribution { self, mut ctx: impl SubsystemContext, ) -> SubsystemResult<()> { - let mut state = State { - relay_parent_state: HashMap::new(), - peer_state: HashMap::new(), - our_view: View(Vec::new()), - metrics: self.metrics, - }; + let mut state = State::default(); + state.metrics = self.metrics; loop { - match ctx.recv().await? { - FromOverseer::Signal(signal) => if handle_signal(&mut state, &mut ctx, signal).await? { - return Ok(()); - }, - FromOverseer::Communication { msg } => match msg { - PoVDistributionMessage::FetchPoV(relay_parent, descriptor, response_sender) => - handle_fetch( + // `select_biased` is used since receiving connection notifications and + // peer view update messages may be racy and we want connection notifications + // first. + futures::select_biased! { + v = state.connection_requests.next() => { + match v { + Some((_relay_parent, _validator_id, peer_id)) => { + handle_validator_connected(&mut state, peer_id); + } + None => break, + } + } + v = ctx.recv().fuse() => { + match v? { + FromOverseer::Signal(signal) => if handle_signal( &mut state, &mut ctx, - relay_parent, - descriptor, - response_sender, - ).await, - PoVDistributionMessage::DistributePoV(relay_parent, descriptor, pov) => - handle_distribute( - &mut state, - &mut ctx, - relay_parent, - descriptor, - pov, - ).await, - PoVDistributionMessage::NetworkBridgeUpdateV1(event) => - handle_network_update( - &mut state, - &mut ctx, - event, - ).await, - }, - } + signal, + ).await? { + return Ok(()); + } + FromOverseer::Communication { msg } => match msg { + PoVDistributionMessage::FetchPoV(relay_parent, descriptor, response_sender) => + handle_fetch( + &mut state, + &mut ctx, + relay_parent, + descriptor, + response_sender, + ).await, + PoVDistributionMessage::DistributePoV(relay_parent, descriptor, pov) => + handle_distribute( + &mut state, + &mut ctx, + relay_parent, + descriptor, + pov, + ).await, + PoVDistributionMessage::NetworkBridgeUpdateV1(event) => + handle_network_update( + &mut state, + &mut ctx, + event, + ).await, + } + } + } + }; } + + Ok(()) } } diff --git a/polkadot/node/network/pov-distribution/src/tests.rs b/polkadot/node/network/pov-distribution/src/tests.rs index 03a39507a9..7c7859b869 100644 --- a/polkadot/node/network/pov-distribution/src/tests.rs +++ b/polkadot/node/network/pov-distribution/src/tests.rs @@ -1,7 +1,21 @@ use super::*; -use futures::executor; -use polkadot_primitives::v1::BlockData; + +use std::time::Duration; + use assert_matches::assert_matches; +use futures::executor; +use tracing::trace; +use smallvec::smallvec; + +use sp_keyring::Sr25519Keyring; + +use polkadot_primitives::v1::{ + AuthorityDiscoveryId, BlockData, CoreState, GroupRotationInfo, Id as ParaId, + ScheduledCore, ValidatorIndex, +}; +use polkadot_subsystem::messages::{RuntimeApiMessage, RuntimeApiRequest}; +use polkadot_node_subsystem_test_helpers as test_helpers; +use polkadot_node_subsystem_util::TimeoutExt; fn make_pov(data: Vec) -> PoV { PoV { block_data: BlockData(data) } @@ -15,6 +29,482 @@ fn make_peer_state(awaited: Vec<(Hash, Vec)>) } } +fn validator_pubkeys(val_ids: &[Sr25519Keyring]) -> Vec { + val_ids.iter().map(|v| v.public().into()).collect() +} + +fn validator_authority_id(val_ids: &[Sr25519Keyring]) -> Vec { + val_ids.iter().map(|v| v.public().into()).collect() +} + +struct TestHarness { + virtual_overseer: test_helpers::TestSubsystemContextHandle, +} + +fn test_harness>( + test: impl FnOnce(TestHarness) -> T, +) { + let _ = env_logger::builder() + .is_test(true) + .filter( + Some("polkadot_pov_distribution"), + log::LevelFilter::Trace, + ) + .filter( + Some(LOG_TARGET), + log::LevelFilter::Trace, + ) + .try_init(); + + let pool = sp_core::testing::TaskExecutor::new(); + + let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone()); + + let subsystem = super::PoVDistribution::new(Metrics::default()); + + let subsystem = subsystem.run(context); + + let test_fut = test(TestHarness { virtual_overseer }); + + futures::pin_mut!(test_fut); + futures::pin_mut!(subsystem); + + executor::block_on(future::select(test_fut, subsystem)); +} + +const TIMEOUT: Duration = Duration::from_millis(100); + +async fn overseer_send( + overseer: &mut test_helpers::TestSubsystemContextHandle, + msg: PoVDistributionMessage, +) { + trace!("Sending message:\n{:?}", &msg); + overseer + .send(FromOverseer::Communication { msg }) + .timeout(TIMEOUT) + .await + .expect(&format!("{:?} is more than enough for sending messages.", TIMEOUT)); +} + +async fn overseer_recv( + overseer: &mut test_helpers::TestSubsystemContextHandle, +) -> AllMessages { + let msg = overseer_recv_with_timeout(overseer, TIMEOUT) + .await + .expect(&format!("{:?} is more than enough to receive messages", TIMEOUT)); + + trace!("Received message:\n{:?}", &msg); + + msg +} + +async fn overseer_recv_with_timeout( + overseer: &mut test_helpers::TestSubsystemContextHandle, + timeout: Duration, +) -> Option { + trace!("Waiting for message..."); + overseer + .recv() + .timeout(timeout) + .await +} + +async fn overseer_signal( + overseer: &mut test_helpers::TestSubsystemContextHandle, + signal: OverseerSignal, +) { + overseer + .send(FromOverseer::Signal(signal)) + .timeout(TIMEOUT) + .await + .expect(&format!("{:?} is more than enough for sending signals.", TIMEOUT)); +} + +#[derive(Clone)] +struct TestState { + chain_ids: Vec, + validators: Vec, + validator_public: Vec, + validator_authority_id: Vec, + validator_peer_id: Vec, + validator_groups: (Vec>, GroupRotationInfo), + relay_parent: Hash, + availability_cores: Vec, +} + +impl Default for TestState { + fn default() -> Self { + let chain_a = ParaId::from(1); + let chain_b = ParaId::from(2); + + let chain_ids = vec![chain_a, chain_b]; + + let validators = vec![ + Sr25519Keyring::Alice, + Sr25519Keyring::Bob, + Sr25519Keyring::Charlie, + Sr25519Keyring::Dave, + Sr25519Keyring::Ferdie, + ]; + + let validator_public = validator_pubkeys(&validators); + let validator_authority_id = validator_authority_id(&validators); + + let validator_peer_id = std::iter::repeat_with(|| PeerId::random()) + .take(validator_public.len()) + .collect(); + + let validator_groups = vec![vec![2, 0, 4], vec![1], vec![3]]; + let group_rotation_info = GroupRotationInfo { + session_start_block: 0, + group_rotation_frequency: 100, + now: 1, + }; + let validator_groups = (validator_groups, group_rotation_info); + + let availability_cores = vec![ + CoreState::Scheduled(ScheduledCore { + para_id: chain_ids[0], + collator: None, + }), + CoreState::Scheduled(ScheduledCore { + para_id: chain_ids[1], + collator: None, + }), + ]; + + let relay_parent = Hash::repeat_byte(0x05); + + Self { + chain_ids, + validators, + validator_public, + validator_authority_id, + validator_peer_id, + validator_groups, + relay_parent, + availability_cores, + } + } +} + +#[test] +fn ask_validators_for_povs() { + let test_state = TestState::default(); + + test_harness(|test_harness| async move { + let mut virtual_overseer = test_harness.virtual_overseer; + + let pov_block = PoV { + block_data: BlockData(vec![42, 43, 44]), + }; + + let pov_hash = pov_block.hash(); + + let mut candidate = CandidateDescriptor::default(); + + let current = test_state.relay_parent.clone(); + candidate.para_id = test_state.chain_ids[0]; + candidate.pov_hash = pov_hash; + candidate.relay_parent = test_state.relay_parent; + + overseer_signal( + &mut virtual_overseer, + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { + activated: smallvec![test_state.relay_parent.clone()], + deactivated: smallvec![], + }), + ).await; + + // first subsystem will try to obtain validators. + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Validators(tx), + )) => { + assert_eq!(relay_parent, current); + tx.send(Ok(test_state.validator_public.clone())).unwrap(); + } + ); + + let (tx, pov_fetch_result) = oneshot::channel(); + + overseer_send( + &mut virtual_overseer, + PoVDistributionMessage::FetchPoV(test_state.relay_parent.clone(), candidate, tx), + ).await; + + // obtain the availability cores. + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::AvailabilityCores(tx) + )) => { + assert_eq!(relay_parent, current); + tx.send(Ok(test_state.availability_cores.clone())).unwrap(); + } + ); + + // Obtain the validator groups + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::ValidatorGroups(tx) + )) => { + assert_eq!(relay_parent, current); + tx.send(Ok(test_state.validator_groups.clone())).unwrap(); + } + ); + + // obtain the validators per relay parent + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Validators(tx), + )) => { + assert_eq!(relay_parent, current); + tx.send(Ok(test_state.validator_public.clone())).unwrap(); + } + ); + + // obtain the validator_id to authority_id mapping + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::ValidatorDiscovery(validators, tx), + )) => { + assert_eq!(relay_parent, current); + assert_eq!(validators.len(), 3); + assert!(validators.iter().all(|v| test_state.validator_public.contains(&v))); + + let result = vec![ + Some(test_state.validator_authority_id[2].clone()), + Some(test_state.validator_authority_id[0].clone()), + Some(test_state.validator_authority_id[4].clone()), + ]; + tx.send(Ok(result)).unwrap(); + } + ); + + // We now should connect to our validator group. + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::ConnectToValidators { + validator_ids, + mut connected, + .. + } + ) => { + assert_eq!(validator_ids.len(), 3); + assert!(validator_ids.iter().all(|id| test_state.validator_authority_id.contains(id))); + + let result = vec![ + (test_state.validator_authority_id[2].clone(), test_state.validator_peer_id[2].clone()), + (test_state.validator_authority_id[0].clone(), test_state.validator_peer_id[0].clone()), + (test_state.validator_authority_id[4].clone(), test_state.validator_peer_id[4].clone()), + ]; + + result.into_iter().for_each(|r| connected.try_send(r).unwrap()); + } + ); + + for i in vec![2, 0, 4] { + overseer_send( + &mut virtual_overseer, + PoVDistributionMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerViewChange( + test_state.validator_peer_id[i].clone(), + View(vec![current]), + ) + ) + ).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( + to_peers, + payload, + )) => { + assert_eq!(to_peers, vec![test_state.validator_peer_id[i].clone()]); + assert_eq!(payload, awaiting_message(current.clone(), vec![pov_hash.clone()])); + } + ); + } + + overseer_send( + &mut virtual_overseer, + PoVDistributionMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerMessage( + test_state.validator_peer_id[2].clone(), + protocol_v1::PoVDistributionMessage::SendPoV(current, pov_hash, pov_block.clone()), + ) + ) + ).await; + + assert_eq!(*pov_fetch_result.await.unwrap(), pov_block); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::ReportPeer(id, benefit)) => { + assert_eq!(benefit, BENEFIT_FRESH_POV); + assert_eq!(id, test_state.validator_peer_id[2].clone()); + } + ); + + // Now let's test that if some peer is ahead of us we would still + // send `Await` on `FetchPoV` message to it. + let next_leaf = Hash::repeat_byte(10); + + // A validator's view changes and now is lets say ahead of us. + overseer_send( + &mut virtual_overseer, + PoVDistributionMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerViewChange( + test_state.validator_peer_id[2].clone(), + View(vec![next_leaf]), + ) + ) + ).await; + + let pov_block = PoV { + block_data: BlockData(vec![45, 46, 47]), + }; + + let pov_hash = pov_block.hash(); + + let candidate = CandidateDescriptor { + para_id: test_state.chain_ids[0], + pov_hash, + relay_parent: next_leaf.clone(), + ..Default::default() + }; + + let (tx, _pov_fetch_result) = oneshot::channel(); + + overseer_signal( + &mut virtual_overseer, + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { + activated: smallvec![next_leaf.clone()], + deactivated: smallvec![current.clone()], + }) + ).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Validators(tx), + )) => { + assert_eq!(relay_parent, next_leaf); + tx.send(Ok(test_state.validator_public.clone())).unwrap(); + } + ); + + overseer_send( + &mut virtual_overseer, + PoVDistributionMessage::FetchPoV(next_leaf.clone(), candidate, tx), + ).await; + + // Obtain the availability cores. + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::AvailabilityCores(tx) + )) => { + assert_eq!(relay_parent, next_leaf); + tx.send(Ok(test_state.availability_cores.clone())).unwrap(); + } + ); + + // Obtain the validator groups + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::ValidatorGroups(tx) + )) => { + assert_eq!(relay_parent, next_leaf); + tx.send(Ok(test_state.validator_groups.clone())).unwrap(); + } + ); + + // obtain the validators per relay parent + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Validators(tx), + )) => { + assert_eq!(relay_parent, next_leaf); + tx.send(Ok(test_state.validator_public.clone())).unwrap(); + } + ); + + // obtain the validator_id to authority_id mapping + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::ValidatorDiscovery(validators, tx), + )) => { + assert_eq!(relay_parent, next_leaf); + assert_eq!(validators.len(), 3); + assert!(validators.iter().all(|v| test_state.validator_public.contains(&v))); + + let result = vec![ + Some(test_state.validator_authority_id[2].clone()), + Some(test_state.validator_authority_id[0].clone()), + Some(test_state.validator_authority_id[4].clone()), + ]; + tx.send(Ok(result)).unwrap(); + } + ); + + // We now should connect to our validator group. + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::ConnectToValidators { + validator_ids, + mut connected, + .. + } + ) => { + assert_eq!(validator_ids.len(), 3); + assert!(validator_ids.iter().all(|id| test_state.validator_authority_id.contains(id))); + + let result = vec![ + (test_state.validator_authority_id[2].clone(), test_state.validator_peer_id[2].clone()), + (test_state.validator_authority_id[0].clone(), test_state.validator_peer_id[0].clone()), + (test_state.validator_authority_id[4].clone(), test_state.validator_peer_id[4].clone()), + ]; + + result.into_iter().for_each(|r| connected.try_send(r).unwrap()); + } + ); + + // We already know that the leaf in question in the peer's view so we request + // a chunk from them right away. + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( + to_peers, + payload, + )) => { + assert_eq!(to_peers, vec![test_state.validator_peer_id[2].clone()]); + assert_eq!(payload, awaiting_message(next_leaf.clone(), vec![pov_hash.clone()])); + } + ); + }); +} + #[test] fn distributes_to_those_awaiting_and_completes_local() { let hash_a: Hash = [0; 32].into(); @@ -66,6 +556,7 @@ fn distributes_to_those_awaiting_and_completes_local() { }, our_view: View(vec![hash_a, hash_b]), metrics: Default::default(), + connection_requests: Default::default(), }; let pool = sp_core::testing::TaskExecutor::new(); @@ -103,8 +594,10 @@ fn distributes_to_those_awaiting_and_completes_local() { }); } + #[test] fn we_inform_peers_with_same_view_we_are_awaiting() { + let hash_a: Hash = [0; 32].into(); let hash_b: Hash = [1; 32].into(); @@ -146,6 +639,7 @@ fn we_inform_peers_with_same_view_we_are_awaiting() { }, our_view: View(vec![hash_a]), metrics: Default::default(), + connection_requests: Default::default(), }; let pool = sp_core::testing::TaskExecutor::new(); @@ -153,29 +647,135 @@ fn we_inform_peers_with_same_view_we_are_awaiting() { let mut descriptor = CandidateDescriptor::default(); descriptor.pov_hash = pov_hash; + let para_id_1 = ParaId::from(1); + let para_id_2 = ParaId::from(2); + + descriptor.para_id = para_id_1; + + let availability_cores = vec![ + CoreState::Scheduled(ScheduledCore { + para_id: para_id_1, + collator: None, + }), + CoreState::Scheduled(ScheduledCore { + para_id: para_id_2, + collator: None, + }), + ]; + + let validators = vec![ + Sr25519Keyring::Alice, + Sr25519Keyring::Bob, + Sr25519Keyring::Charlie, + Sr25519Keyring::Dave, + Sr25519Keyring::Ferdie, + ]; + + let validator_authority_id = validator_authority_id(&validators); + let validators = validator_pubkeys(&validators); + + let validator_peer_id: Vec<_> = std::iter::repeat_with(|| PeerId::random()) + .take(validators.len()) + .collect(); + + let validator_groups = vec![vec![2, 0, 4], vec![1], vec![3]]; + let group_rotation_info = GroupRotationInfo { + session_start_block: 0, + group_rotation_frequency: 100, + now: 1, + }; + + let validator_groups = (validator_groups, group_rotation_info); + executor::block_on(async move { - handle_fetch( + let handle_future = handle_fetch( &mut state, &mut ctx, hash_a, descriptor, pov_send, - ).await; + ); - assert_eq!(state.relay_parent_state[&hash_a].fetching[&pov_hash].len(), 1); + let check_future = async move { + //assert_eq!(state.relay_parent_state[&hash_a].fetching[&pov_hash].len(), 1); + assert_matches!( + handle.recv().await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::AvailabilityCores(tx) + )) => { + assert_eq!(relay_parent, hash_a); + tx.send(Ok(availability_cores)).unwrap(); + } + ); - assert_matches!( - handle.recv().await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::SendValidationMessage(peers, message) - ) => { - assert_eq!(peers, vec![peer_a.clone()]); - assert_eq!( - message, - awaiting_message(hash_a, vec![pov_hash]), - ); - } - ) + assert_matches!( + handle.recv().await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::ValidatorGroups(tx) + )) => { + assert_eq!(relay_parent, hash_a); + tx.send(Ok(validator_groups)).unwrap(); + } + ); + + assert_matches!( + handle.recv().await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Validators(tx), + )) => { + assert_eq!(relay_parent, hash_a); + tx.send(Ok(validators.clone())).unwrap(); + } + ); + + assert_matches!( + handle.recv().await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::ValidatorDiscovery(validators_res, tx), + )) => { + assert_eq!(relay_parent, hash_a); + assert_eq!(validators_res.len(), 3); + assert!(validators_res.iter().all(|v| validators.contains(&v))); + + let result = vec![ + Some(validator_authority_id[2].clone()), + Some(validator_authority_id[0].clone()), + Some(validator_authority_id[4].clone()), + ]; + + tx.send(Ok(result)).unwrap(); + } + ); + + assert_matches!( + handle.recv().await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::ConnectToValidators { + validator_ids, + mut connected, + .. + } + ) => { + assert_eq!(validator_ids.len(), 3); + assert!(validator_ids.iter().all(|id| validator_authority_id.contains(id))); + + let result = vec![ + (validator_authority_id[2].clone(), validator_peer_id[2].clone()), + (validator_authority_id[0].clone(), validator_peer_id[0].clone()), + (validator_authority_id[4].clone(), validator_peer_id[4].clone()), + ]; + + result.into_iter().for_each(|r| connected.try_send(r).unwrap()); + } + ); + + }; + + futures::join!(handle_future, check_future); }); } @@ -224,6 +824,7 @@ fn peer_view_change_leads_to_us_informing() { }, our_view: View(vec![hash_a]), metrics: Default::default(), + connection_requests: Default::default(), }; let pool = sp_core::testing::TaskExecutor::new(); @@ -296,6 +897,7 @@ fn peer_complete_fetch_and_is_rewarded() { }, our_view: View(vec![hash_a]), metrics: Default::default(), + connection_requests: Default::default(), }; let pool = sp_core::testing::TaskExecutor::new(); @@ -385,6 +987,7 @@ fn peer_punished_for_sending_bad_pov() { }, our_view: View(vec![hash_a]), metrics: Default::default(), + connection_requests: Default::default(), }; let pool = sp_core::testing::TaskExecutor::new(); @@ -449,6 +1052,7 @@ fn peer_punished_for_sending_unexpected_pov() { }, our_view: View(vec![hash_a]), metrics: Default::default(), + connection_requests: Default::default(), }; let pool = sp_core::testing::TaskExecutor::new(); @@ -511,6 +1115,7 @@ fn peer_punished_for_sending_pov_out_of_our_view() { }, our_view: View(vec![hash_a]), metrics: Default::default(), + connection_requests: Default::default(), }; let pool = sp_core::testing::TaskExecutor::new(); @@ -570,6 +1175,7 @@ fn peer_reported_for_awaiting_too_much() { }, our_view: View(vec![hash_a]), metrics: Default::default(), + connection_requests: Default::default(), }; let pool = sp_core::testing::TaskExecutor::new(); @@ -656,6 +1262,7 @@ fn peer_reported_for_awaiting_outside_their_view() { }, our_view: View(vec![hash_a, hash_b]), metrics: Default::default(), + connection_requests: Default::default(), }; let pool = sp_core::testing::TaskExecutor::new(); @@ -719,6 +1326,7 @@ fn peer_reported_for_awaiting_outside_our_view() { }, our_view: View(vec![hash_a]), metrics: Default::default(), + connection_requests: Default::default(), }; let pool = sp_core::testing::TaskExecutor::new(); @@ -797,6 +1405,7 @@ fn peer_complete_fetch_leads_to_us_completing_others() { }, our_view: View(vec![hash_a]), metrics: Default::default(), + connection_requests: Default::default(), }; let pool = sp_core::testing::TaskExecutor::new(); @@ -880,6 +1489,7 @@ fn peer_completing_request_no_longer_awaiting() { }, our_view: View(vec![hash_a]), metrics: Default::default(), + connection_requests: Default::default(), }; let pool = sp_core::testing::TaskExecutor::new(); diff --git a/polkadot/node/subsystem-util/src/validator_discovery.rs b/polkadot/node/subsystem-util/src/validator_discovery.rs index 83c405a3f4..357041d96b 100644 --- a/polkadot/node/subsystem-util/src/validator_discovery.rs +++ b/polkadot/node/subsystem-util/src/validator_discovery.rs @@ -115,6 +115,12 @@ pub struct ConnectionRequests { requests: StreamUnordered, } +impl stream::FusedStream for ConnectionRequests { + fn is_terminated(&self) -> bool { + false + } +} + impl ConnectionRequests { /// Insert a new connection request. /// @@ -133,6 +139,11 @@ impl ConnectionRequests { Pin::new(&mut self.requests).remove(token); } } + + /// Is a connection at this relay parent already present in the request + pub fn contains_request(&self, relay_parent: &Hash) -> bool { + self.id_map.contains_key(relay_parent) + } } impl stream::Stream for ConnectionRequests {