diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock
index a78cf0038d..97f4c52ab9 100644
--- a/polkadot/Cargo.lock
+++ b/polkadot/Cargo.lock
@@ -5202,13 +5202,18 @@ name = "polkadot-pov-distribution"
version = "0.1.0"
dependencies = [
"assert_matches",
+ "env_logger 0.8.2",
"futures 0.3.8",
+ "log",
"polkadot-node-network-protocol",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
"polkadot-node-subsystem-util",
"polkadot-primitives",
+ "smallvec 1.5.0",
"sp-core",
+ "sp-keyring",
+ "thiserror",
"tracing",
"tracing-futures",
]
diff --git a/polkadot/node/network/pov-distribution/Cargo.toml b/polkadot/node/network/pov-distribution/Cargo.toml
index 6ef2d10a56..6f2bc640b0 100644
--- a/polkadot/node/network/pov-distribution/Cargo.toml
+++ b/polkadot/node/network/pov-distribution/Cargo.toml
@@ -6,8 +6,10 @@ edition = "2018"
[dependencies]
futures = "0.3.8"
+thiserror = "1.0.21"
tracing = "0.1.22"
tracing-futures = "0.2.4"
+
polkadot-primitives = { path = "../../../primitives" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
@@ -15,5 +17,11 @@ polkadot-node-network-protocol = { path = "../../network/protocol" }
[dev-dependencies]
assert_matches = "1.4.0"
+env_logger = "0.8.1"
+log = "0.4.11"
+smallvec = "1.4.2"
+
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
+sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
+
polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
diff --git a/polkadot/node/network/pov-distribution/src/error.rs b/polkadot/node/network/pov-distribution/src/error.rs
new file mode 100644
index 0000000000..625ea4c7fd
--- /dev/null
+++ b/polkadot/node/network/pov-distribution/src/error.rs
@@ -0,0 +1,35 @@
+// Copyright 2020 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot. If not, see .
+
+//! The `Error` and `Result` types used by the subsystem.
+
+use thiserror::Error;
+
+#[derive(Debug, Error)]
+pub enum Error {
+ #[error(transparent)]
+ Subsystem(#[from] polkadot_subsystem::SubsystemError),
+ #[error(transparent)]
+ OneshotRecv(#[from] futures::channel::oneshot::Canceled),
+ #[error(transparent)]
+ Runtime(#[from] polkadot_subsystem::errors::RuntimeApiError),
+ #[error(transparent)]
+ ValidatorDiscovery(#[from] polkadot_node_subsystem_util::validator_discovery::Error),
+ #[error(transparent)]
+ Util(#[from] polkadot_node_subsystem_util::Error),
+}
+
+pub type Result = std::result::Result;
diff --git a/polkadot/node/network/pov-distribution/src/lib.rs b/polkadot/node/network/pov-distribution/src/lib.rs
index 01b86b44a0..1b59441fc0 100644
--- a/polkadot/node/network/pov-distribution/src/lib.rs
+++ b/polkadot/node/network/pov-distribution/src/lib.rs
@@ -22,16 +22,26 @@
#![deny(unused_crate_dependencies)]
#![warn(missing_docs)]
-use polkadot_primitives::v1::{Hash, PoV, CandidateDescriptor};
+use polkadot_primitives::v1::{
+ Hash, PoV, CandidateDescriptor, ValidatorId, Id as ParaId, CoreIndex, CoreState,
+};
use polkadot_subsystem::{
- ActiveLeavesUpdate, OverseerSignal, SubsystemContext, Subsystem, SubsystemResult, SubsystemError,
+ ActiveLeavesUpdate, OverseerSignal, SubsystemContext, SubsystemResult, SubsystemError, Subsystem,
FromOverseer, SpawnedSubsystem,
messages::{
- PoVDistributionMessage, RuntimeApiMessage, RuntimeApiRequest, AllMessages, NetworkBridgeMessage,
+ PoVDistributionMessage, AllMessages, NetworkBridgeMessage,
},
};
-use polkadot_node_subsystem_util::metrics::{self, prometheus};
-use polkadot_node_network_protocol::{v1 as protocol_v1, ReputationChange as Rep, NetworkBridgeEvent, PeerId, View};
+use polkadot_node_subsystem_util::{
+ validator_discovery,
+ request_validators_ctx,
+ request_validator_groups_ctx,
+ request_availability_cores_ctx,
+ metrics::{self, prometheus},
+};
+use polkadot_node_network_protocol::{
+ v1 as protocol_v1, ReputationChange as Rep, NetworkBridgeEvent, PeerId, View,
+};
use futures::prelude::*;
use futures::channel::oneshot;
@@ -39,6 +49,8 @@ use futures::channel::oneshot;
use std::collections::{hash_map::{Entry, HashMap}, HashSet};
use std::sync::Arc;
+mod error;
+
#[cfg(test)]
mod tests;
@@ -75,20 +87,33 @@ impl Subsystem for PoVDistribution
}
}
+#[derive(Default)]
struct State {
+ /// A state of things going on on a per-relay-parent basis.
relay_parent_state: HashMap,
+
+ /// Info on peers.
peer_state: HashMap,
+
+ /// Our own view.
our_view: View,
+
+ /// Connect to relevant groups of validators at different relay parents.
+ connection_requests: validator_discovery::ConnectionRequests,
+
+ /// Metrics.
metrics: Metrics,
}
struct BlockBasedState {
known: HashMap>,
+
/// All the PoVs we are or were fetching, coupled with channels expecting the data.
///
/// This may be an empty list, which indicates that we were once awaiting this PoV but have
/// received it already.
fetching: HashMap>>>,
+
n_validators: usize,
}
@@ -128,38 +153,45 @@ async fn handle_signal(
let _timer = state.metrics.time_handle_signal();
for relay_parent in activated {
- let (vals_tx, vals_rx) = oneshot::channel();
- ctx.send_message(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
- relay_parent,
- RuntimeApiRequest::Validators(vals_tx),
- ))).await;
+ match request_validators_ctx(relay_parent.clone(), ctx).await {
+ Ok(vals_rx) => {
+ let n_validators = match vals_rx.await? {
+ Ok(v) => v.len(),
+ Err(e) => {
+ tracing::warn!(
+ target: LOG_TARGET,
+ err = ?e,
+ "Error fetching validators from runtime API for active leaf",
+ );
- let n_validators = match vals_rx.await? {
- Ok(v) => v.len(),
+ // Not adding bookkeeping here might make us behave funny, but we
+ // shouldn't take down the node on spurious runtime API errors.
+ //
+ // and this is "behave funny" as in be bad at our job, but not in any
+ // slashable or security-related way.
+ continue;
+ }
+ };
+
+ state.relay_parent_state.insert(relay_parent, BlockBasedState {
+ known: HashMap::new(),
+ fetching: HashMap::new(),
+ n_validators,
+ });
+ }
Err(e) => {
+ // continue here also as above.
tracing::warn!(
target: LOG_TARGET,
err = ?e,
"Error fetching validators from runtime API for active leaf",
);
-
- // Not adding bookkeeping here might make us behave funny, but we
- // shouldn't take down the node on spurious runtime API errors.
- //
- // and this is "behave funny" as in be bad at our job, but not in any
- // slashable or security-related way.
- continue;
}
- };
-
- state.relay_parent_state.insert(relay_parent, BlockBasedState {
- known: HashMap::new(),
- fetching: HashMap::new(),
- n_validators: n_validators,
- });
+ }
}
for relay_parent in deactivated {
+ state.connection_requests.remove(&relay_parent);
state.relay_parent_state.remove(&relay_parent);
}
@@ -197,7 +229,7 @@ async fn notify_all_we_are_awaiting(
ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
peers_to_send,
payload,
- ))).await
+ ))).await;
}
/// Notify one peer about everything we're awaiting at a given relay-parent.
@@ -224,7 +256,7 @@ async fn notify_one_we_are_awaiting_many(
ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
vec![peer.clone()],
payload,
- ))).await
+ ))).await;
}
/// Distribute a PoV to peers who are awaiting it.
@@ -262,6 +294,75 @@ async fn distribute_to_awaiting(
metrics.on_pov_distributed();
}
+/// Get the Id of the Core that is assigned to the para being collated on if any
+/// and the total number of cores.
+async fn determine_core(
+ ctx: &mut impl SubsystemContext,
+ para_id: ParaId,
+ relay_parent: Hash,
+) -> error::Result> {
+ let cores = request_availability_cores_ctx(relay_parent, ctx).await?.await??;
+
+ for (idx, core) in cores.iter().enumerate() {
+ if let CoreState::Scheduled(occupied) = core {
+ if occupied.para_id == para_id {
+ return Ok(Some(((idx as u32).into(), cores.len())));
+ }
+ }
+ }
+
+ Ok(None)
+}
+
+/// Figure out a group of validators assigned to a given `ParaId`.
+async fn determine_validators_for_core(
+ ctx: &mut impl SubsystemContext,
+ core_index: CoreIndex,
+ num_cores: usize,
+ relay_parent: Hash,
+) -> error::Result>> {
+ let groups = request_validator_groups_ctx(relay_parent, ctx).await?.await??;
+
+ let group_index = groups.1.group_for_core(core_index, num_cores);
+
+ let connect_to_validators = match groups.0.get(group_index.0 as usize) {
+ Some(group) => group.clone(),
+ None => return Ok(None),
+ };
+
+ let validators = request_validators_ctx(relay_parent, ctx).await?.await??;
+
+ let validators = connect_to_validators
+ .into_iter()
+ .map(|idx| validators[idx as usize].clone())
+ .collect();
+
+ Ok(Some(validators))
+}
+
+async fn determine_relevant_validators(
+ ctx: &mut impl SubsystemContext,
+ relay_parent: Hash,
+ para_id: ParaId,
+) -> error::Result>> {
+ // Determine which core the para_id is assigned to.
+ let (core, num_cores) = match determine_core(ctx, para_id, relay_parent).await? {
+ Some(core) => core,
+ None => {
+ tracing::warn!(
+ target: LOG_TARGET,
+ "Looks like no core is assigned to {:?} at {:?}",
+ para_id,
+ relay_parent,
+ );
+
+ return Ok(None);
+ }
+ };
+
+ determine_validators_for_core(ctx, core, num_cores, relay_parent).await
+}
+
/// Handles a `FetchPoV` message.
#[tracing::instrument(level = "trace", skip(ctx, state, response_sender), fields(subsystem = LOG_TARGET))]
async fn handle_fetch(
@@ -291,7 +392,35 @@ async fn handle_fetch(
return;
}
Entry::Vacant(e) => {
- e.insert(vec![response_sender]);
+ if let Ok(Some(relevant_validators)) = determine_relevant_validators(
+ ctx,
+ relay_parent,
+ descriptor.para_id,
+ ).await {
+ // We only need one connection request per (relay_parent, para_id)
+ // so here we take this shortcut to avoid calling `connect_to_validators`
+ // more than once.
+ if !state.connection_requests.contains_request(&relay_parent) {
+ match validator_discovery::connect_to_validators(
+ ctx,
+ relay_parent,
+ relevant_validators.clone(),
+ ).await {
+ Ok(new_connection_request) => {
+ state.connection_requests.put(relay_parent, new_connection_request);
+ }
+ Err(e) => {
+ tracing::debug!(
+ target: LOG_TARGET,
+ "Failed to create a validator connection request {:?}",
+ e,
+ );
+ }
+ }
+ }
+
+ e.insert(vec![response_sender]);
+ }
}
}
}
@@ -482,6 +611,11 @@ async fn handle_incoming_pov(
).await
}
+/// Handles a newly connected validator in the context of some relay leaf.
+fn handle_validator_connected(state: &mut State, peer_id: PeerId) {
+ state.peer_state.entry(peer_id).or_default();
+}
+
/// Handles a network bridge update.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn handle_network_update(
@@ -493,7 +627,7 @@ async fn handle_network_update(
match update {
NetworkBridgeEvent::PeerConnected(peer, _observed_role) => {
- state.peer_state.insert(peer, PeerState { awaited: HashMap::new() });
+ handle_validator_connected(state, peer);
}
NetworkBridgeEvent::PeerDisconnected(peer) => {
state.peer_state.remove(&peer);
@@ -558,44 +692,61 @@ impl PoVDistribution {
self,
mut ctx: impl SubsystemContext,
) -> SubsystemResult<()> {
- let mut state = State {
- relay_parent_state: HashMap::new(),
- peer_state: HashMap::new(),
- our_view: View(Vec::new()),
- metrics: self.metrics,
- };
+ let mut state = State::default();
+ state.metrics = self.metrics;
loop {
- match ctx.recv().await? {
- FromOverseer::Signal(signal) => if handle_signal(&mut state, &mut ctx, signal).await? {
- return Ok(());
- },
- FromOverseer::Communication { msg } => match msg {
- PoVDistributionMessage::FetchPoV(relay_parent, descriptor, response_sender) =>
- handle_fetch(
+ // `select_biased` is used since receiving connection notifications and
+ // peer view update messages may be racy and we want connection notifications
+ // first.
+ futures::select_biased! {
+ v = state.connection_requests.next() => {
+ match v {
+ Some((_relay_parent, _validator_id, peer_id)) => {
+ handle_validator_connected(&mut state, peer_id);
+ }
+ None => break,
+ }
+ }
+ v = ctx.recv().fuse() => {
+ match v? {
+ FromOverseer::Signal(signal) => if handle_signal(
&mut state,
&mut ctx,
- relay_parent,
- descriptor,
- response_sender,
- ).await,
- PoVDistributionMessage::DistributePoV(relay_parent, descriptor, pov) =>
- handle_distribute(
- &mut state,
- &mut ctx,
- relay_parent,
- descriptor,
- pov,
- ).await,
- PoVDistributionMessage::NetworkBridgeUpdateV1(event) =>
- handle_network_update(
- &mut state,
- &mut ctx,
- event,
- ).await,
- },
- }
+ signal,
+ ).await? {
+ return Ok(());
+ }
+ FromOverseer::Communication { msg } => match msg {
+ PoVDistributionMessage::FetchPoV(relay_parent, descriptor, response_sender) =>
+ handle_fetch(
+ &mut state,
+ &mut ctx,
+ relay_parent,
+ descriptor,
+ response_sender,
+ ).await,
+ PoVDistributionMessage::DistributePoV(relay_parent, descriptor, pov) =>
+ handle_distribute(
+ &mut state,
+ &mut ctx,
+ relay_parent,
+ descriptor,
+ pov,
+ ).await,
+ PoVDistributionMessage::NetworkBridgeUpdateV1(event) =>
+ handle_network_update(
+ &mut state,
+ &mut ctx,
+ event,
+ ).await,
+ }
+ }
+ }
+ };
}
+
+ Ok(())
}
}
diff --git a/polkadot/node/network/pov-distribution/src/tests.rs b/polkadot/node/network/pov-distribution/src/tests.rs
index 03a39507a9..7c7859b869 100644
--- a/polkadot/node/network/pov-distribution/src/tests.rs
+++ b/polkadot/node/network/pov-distribution/src/tests.rs
@@ -1,7 +1,21 @@
use super::*;
-use futures::executor;
-use polkadot_primitives::v1::BlockData;
+
+use std::time::Duration;
+
use assert_matches::assert_matches;
+use futures::executor;
+use tracing::trace;
+use smallvec::smallvec;
+
+use sp_keyring::Sr25519Keyring;
+
+use polkadot_primitives::v1::{
+ AuthorityDiscoveryId, BlockData, CoreState, GroupRotationInfo, Id as ParaId,
+ ScheduledCore, ValidatorIndex,
+};
+use polkadot_subsystem::messages::{RuntimeApiMessage, RuntimeApiRequest};
+use polkadot_node_subsystem_test_helpers as test_helpers;
+use polkadot_node_subsystem_util::TimeoutExt;
fn make_pov(data: Vec) -> PoV {
PoV { block_data: BlockData(data) }
@@ -15,6 +29,482 @@ fn make_peer_state(awaited: Vec<(Hash, Vec)>)
}
}
+fn validator_pubkeys(val_ids: &[Sr25519Keyring]) -> Vec {
+ val_ids.iter().map(|v| v.public().into()).collect()
+}
+
+fn validator_authority_id(val_ids: &[Sr25519Keyring]) -> Vec {
+ val_ids.iter().map(|v| v.public().into()).collect()
+}
+
+struct TestHarness {
+ virtual_overseer: test_helpers::TestSubsystemContextHandle,
+}
+
+fn test_harness>(
+ test: impl FnOnce(TestHarness) -> T,
+) {
+ let _ = env_logger::builder()
+ .is_test(true)
+ .filter(
+ Some("polkadot_pov_distribution"),
+ log::LevelFilter::Trace,
+ )
+ .filter(
+ Some(LOG_TARGET),
+ log::LevelFilter::Trace,
+ )
+ .try_init();
+
+ let pool = sp_core::testing::TaskExecutor::new();
+
+ let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone());
+
+ let subsystem = super::PoVDistribution::new(Metrics::default());
+
+ let subsystem = subsystem.run(context);
+
+ let test_fut = test(TestHarness { virtual_overseer });
+
+ futures::pin_mut!(test_fut);
+ futures::pin_mut!(subsystem);
+
+ executor::block_on(future::select(test_fut, subsystem));
+}
+
+const TIMEOUT: Duration = Duration::from_millis(100);
+
+async fn overseer_send(
+ overseer: &mut test_helpers::TestSubsystemContextHandle,
+ msg: PoVDistributionMessage,
+) {
+ trace!("Sending message:\n{:?}", &msg);
+ overseer
+ .send(FromOverseer::Communication { msg })
+ .timeout(TIMEOUT)
+ .await
+ .expect(&format!("{:?} is more than enough for sending messages.", TIMEOUT));
+}
+
+async fn overseer_recv(
+ overseer: &mut test_helpers::TestSubsystemContextHandle,
+) -> AllMessages {
+ let msg = overseer_recv_with_timeout(overseer, TIMEOUT)
+ .await
+ .expect(&format!("{:?} is more than enough to receive messages", TIMEOUT));
+
+ trace!("Received message:\n{:?}", &msg);
+
+ msg
+}
+
+async fn overseer_recv_with_timeout(
+ overseer: &mut test_helpers::TestSubsystemContextHandle,
+ timeout: Duration,
+) -> Option {
+ trace!("Waiting for message...");
+ overseer
+ .recv()
+ .timeout(timeout)
+ .await
+}
+
+async fn overseer_signal(
+ overseer: &mut test_helpers::TestSubsystemContextHandle,
+ signal: OverseerSignal,
+) {
+ overseer
+ .send(FromOverseer::Signal(signal))
+ .timeout(TIMEOUT)
+ .await
+ .expect(&format!("{:?} is more than enough for sending signals.", TIMEOUT));
+}
+
+#[derive(Clone)]
+struct TestState {
+ chain_ids: Vec,
+ validators: Vec,
+ validator_public: Vec,
+ validator_authority_id: Vec,
+ validator_peer_id: Vec,
+ validator_groups: (Vec>, GroupRotationInfo),
+ relay_parent: Hash,
+ availability_cores: Vec,
+}
+
+impl Default for TestState {
+ fn default() -> Self {
+ let chain_a = ParaId::from(1);
+ let chain_b = ParaId::from(2);
+
+ let chain_ids = vec![chain_a, chain_b];
+
+ let validators = vec![
+ Sr25519Keyring::Alice,
+ Sr25519Keyring::Bob,
+ Sr25519Keyring::Charlie,
+ Sr25519Keyring::Dave,
+ Sr25519Keyring::Ferdie,
+ ];
+
+ let validator_public = validator_pubkeys(&validators);
+ let validator_authority_id = validator_authority_id(&validators);
+
+ let validator_peer_id = std::iter::repeat_with(|| PeerId::random())
+ .take(validator_public.len())
+ .collect();
+
+ let validator_groups = vec![vec![2, 0, 4], vec![1], vec![3]];
+ let group_rotation_info = GroupRotationInfo {
+ session_start_block: 0,
+ group_rotation_frequency: 100,
+ now: 1,
+ };
+ let validator_groups = (validator_groups, group_rotation_info);
+
+ let availability_cores = vec![
+ CoreState::Scheduled(ScheduledCore {
+ para_id: chain_ids[0],
+ collator: None,
+ }),
+ CoreState::Scheduled(ScheduledCore {
+ para_id: chain_ids[1],
+ collator: None,
+ }),
+ ];
+
+ let relay_parent = Hash::repeat_byte(0x05);
+
+ Self {
+ chain_ids,
+ validators,
+ validator_public,
+ validator_authority_id,
+ validator_peer_id,
+ validator_groups,
+ relay_parent,
+ availability_cores,
+ }
+ }
+}
+
+#[test]
+fn ask_validators_for_povs() {
+ let test_state = TestState::default();
+
+ test_harness(|test_harness| async move {
+ let mut virtual_overseer = test_harness.virtual_overseer;
+
+ let pov_block = PoV {
+ block_data: BlockData(vec![42, 43, 44]),
+ };
+
+ let pov_hash = pov_block.hash();
+
+ let mut candidate = CandidateDescriptor::default();
+
+ let current = test_state.relay_parent.clone();
+ candidate.para_id = test_state.chain_ids[0];
+ candidate.pov_hash = pov_hash;
+ candidate.relay_parent = test_state.relay_parent;
+
+ overseer_signal(
+ &mut virtual_overseer,
+ OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
+ activated: smallvec![test_state.relay_parent.clone()],
+ deactivated: smallvec![],
+ }),
+ ).await;
+
+ // first subsystem will try to obtain validators.
+ assert_matches!(
+ overseer_recv(&mut virtual_overseer).await,
+ AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+ relay_parent,
+ RuntimeApiRequest::Validators(tx),
+ )) => {
+ assert_eq!(relay_parent, current);
+ tx.send(Ok(test_state.validator_public.clone())).unwrap();
+ }
+ );
+
+ let (tx, pov_fetch_result) = oneshot::channel();
+
+ overseer_send(
+ &mut virtual_overseer,
+ PoVDistributionMessage::FetchPoV(test_state.relay_parent.clone(), candidate, tx),
+ ).await;
+
+ // obtain the availability cores.
+ assert_matches!(
+ overseer_recv(&mut virtual_overseer).await,
+ AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+ relay_parent,
+ RuntimeApiRequest::AvailabilityCores(tx)
+ )) => {
+ assert_eq!(relay_parent, current);
+ tx.send(Ok(test_state.availability_cores.clone())).unwrap();
+ }
+ );
+
+ // Obtain the validator groups
+ assert_matches!(
+ overseer_recv(&mut virtual_overseer).await,
+ AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+ relay_parent,
+ RuntimeApiRequest::ValidatorGroups(tx)
+ )) => {
+ assert_eq!(relay_parent, current);
+ tx.send(Ok(test_state.validator_groups.clone())).unwrap();
+ }
+ );
+
+ // obtain the validators per relay parent
+ assert_matches!(
+ overseer_recv(&mut virtual_overseer).await,
+ AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+ relay_parent,
+ RuntimeApiRequest::Validators(tx),
+ )) => {
+ assert_eq!(relay_parent, current);
+ tx.send(Ok(test_state.validator_public.clone())).unwrap();
+ }
+ );
+
+ // obtain the validator_id to authority_id mapping
+ assert_matches!(
+ overseer_recv(&mut virtual_overseer).await,
+ AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+ relay_parent,
+ RuntimeApiRequest::ValidatorDiscovery(validators, tx),
+ )) => {
+ assert_eq!(relay_parent, current);
+ assert_eq!(validators.len(), 3);
+ assert!(validators.iter().all(|v| test_state.validator_public.contains(&v)));
+
+ let result = vec![
+ Some(test_state.validator_authority_id[2].clone()),
+ Some(test_state.validator_authority_id[0].clone()),
+ Some(test_state.validator_authority_id[4].clone()),
+ ];
+ tx.send(Ok(result)).unwrap();
+ }
+ );
+
+ // We now should connect to our validator group.
+ assert_matches!(
+ overseer_recv(&mut virtual_overseer).await,
+ AllMessages::NetworkBridge(
+ NetworkBridgeMessage::ConnectToValidators {
+ validator_ids,
+ mut connected,
+ ..
+ }
+ ) => {
+ assert_eq!(validator_ids.len(), 3);
+ assert!(validator_ids.iter().all(|id| test_state.validator_authority_id.contains(id)));
+
+ let result = vec![
+ (test_state.validator_authority_id[2].clone(), test_state.validator_peer_id[2].clone()),
+ (test_state.validator_authority_id[0].clone(), test_state.validator_peer_id[0].clone()),
+ (test_state.validator_authority_id[4].clone(), test_state.validator_peer_id[4].clone()),
+ ];
+
+ result.into_iter().for_each(|r| connected.try_send(r).unwrap());
+ }
+ );
+
+ for i in vec![2, 0, 4] {
+ overseer_send(
+ &mut virtual_overseer,
+ PoVDistributionMessage::NetworkBridgeUpdateV1(
+ NetworkBridgeEvent::PeerViewChange(
+ test_state.validator_peer_id[i].clone(),
+ View(vec![current]),
+ )
+ )
+ ).await;
+
+ assert_matches!(
+ overseer_recv(&mut virtual_overseer).await,
+ AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
+ to_peers,
+ payload,
+ )) => {
+ assert_eq!(to_peers, vec![test_state.validator_peer_id[i].clone()]);
+ assert_eq!(payload, awaiting_message(current.clone(), vec![pov_hash.clone()]));
+ }
+ );
+ }
+
+ overseer_send(
+ &mut virtual_overseer,
+ PoVDistributionMessage::NetworkBridgeUpdateV1(
+ NetworkBridgeEvent::PeerMessage(
+ test_state.validator_peer_id[2].clone(),
+ protocol_v1::PoVDistributionMessage::SendPoV(current, pov_hash, pov_block.clone()),
+ )
+ )
+ ).await;
+
+ assert_eq!(*pov_fetch_result.await.unwrap(), pov_block);
+
+ assert_matches!(
+ overseer_recv(&mut virtual_overseer).await,
+ AllMessages::NetworkBridge(NetworkBridgeMessage::ReportPeer(id, benefit)) => {
+ assert_eq!(benefit, BENEFIT_FRESH_POV);
+ assert_eq!(id, test_state.validator_peer_id[2].clone());
+ }
+ );
+
+ // Now let's test that if some peer is ahead of us we would still
+ // send `Await` on `FetchPoV` message to it.
+ let next_leaf = Hash::repeat_byte(10);
+
+ // A validator's view changes and now is lets say ahead of us.
+ overseer_send(
+ &mut virtual_overseer,
+ PoVDistributionMessage::NetworkBridgeUpdateV1(
+ NetworkBridgeEvent::PeerViewChange(
+ test_state.validator_peer_id[2].clone(),
+ View(vec![next_leaf]),
+ )
+ )
+ ).await;
+
+ let pov_block = PoV {
+ block_data: BlockData(vec![45, 46, 47]),
+ };
+
+ let pov_hash = pov_block.hash();
+
+ let candidate = CandidateDescriptor {
+ para_id: test_state.chain_ids[0],
+ pov_hash,
+ relay_parent: next_leaf.clone(),
+ ..Default::default()
+ };
+
+ let (tx, _pov_fetch_result) = oneshot::channel();
+
+ overseer_signal(
+ &mut virtual_overseer,
+ OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
+ activated: smallvec![next_leaf.clone()],
+ deactivated: smallvec![current.clone()],
+ })
+ ).await;
+
+ assert_matches!(
+ overseer_recv(&mut virtual_overseer).await,
+ AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+ relay_parent,
+ RuntimeApiRequest::Validators(tx),
+ )) => {
+ assert_eq!(relay_parent, next_leaf);
+ tx.send(Ok(test_state.validator_public.clone())).unwrap();
+ }
+ );
+
+ overseer_send(
+ &mut virtual_overseer,
+ PoVDistributionMessage::FetchPoV(next_leaf.clone(), candidate, tx),
+ ).await;
+
+ // Obtain the availability cores.
+ assert_matches!(
+ overseer_recv(&mut virtual_overseer).await,
+ AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+ relay_parent,
+ RuntimeApiRequest::AvailabilityCores(tx)
+ )) => {
+ assert_eq!(relay_parent, next_leaf);
+ tx.send(Ok(test_state.availability_cores.clone())).unwrap();
+ }
+ );
+
+ // Obtain the validator groups
+ assert_matches!(
+ overseer_recv(&mut virtual_overseer).await,
+ AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+ relay_parent,
+ RuntimeApiRequest::ValidatorGroups(tx)
+ )) => {
+ assert_eq!(relay_parent, next_leaf);
+ tx.send(Ok(test_state.validator_groups.clone())).unwrap();
+ }
+ );
+
+ // obtain the validators per relay parent
+ assert_matches!(
+ overseer_recv(&mut virtual_overseer).await,
+ AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+ relay_parent,
+ RuntimeApiRequest::Validators(tx),
+ )) => {
+ assert_eq!(relay_parent, next_leaf);
+ tx.send(Ok(test_state.validator_public.clone())).unwrap();
+ }
+ );
+
+ // obtain the validator_id to authority_id mapping
+ assert_matches!(
+ overseer_recv(&mut virtual_overseer).await,
+ AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+ relay_parent,
+ RuntimeApiRequest::ValidatorDiscovery(validators, tx),
+ )) => {
+ assert_eq!(relay_parent, next_leaf);
+ assert_eq!(validators.len(), 3);
+ assert!(validators.iter().all(|v| test_state.validator_public.contains(&v)));
+
+ let result = vec![
+ Some(test_state.validator_authority_id[2].clone()),
+ Some(test_state.validator_authority_id[0].clone()),
+ Some(test_state.validator_authority_id[4].clone()),
+ ];
+ tx.send(Ok(result)).unwrap();
+ }
+ );
+
+ // We now should connect to our validator group.
+ assert_matches!(
+ overseer_recv(&mut virtual_overseer).await,
+ AllMessages::NetworkBridge(
+ NetworkBridgeMessage::ConnectToValidators {
+ validator_ids,
+ mut connected,
+ ..
+ }
+ ) => {
+ assert_eq!(validator_ids.len(), 3);
+ assert!(validator_ids.iter().all(|id| test_state.validator_authority_id.contains(id)));
+
+ let result = vec![
+ (test_state.validator_authority_id[2].clone(), test_state.validator_peer_id[2].clone()),
+ (test_state.validator_authority_id[0].clone(), test_state.validator_peer_id[0].clone()),
+ (test_state.validator_authority_id[4].clone(), test_state.validator_peer_id[4].clone()),
+ ];
+
+ result.into_iter().for_each(|r| connected.try_send(r).unwrap());
+ }
+ );
+
+ // We already know that the leaf in question in the peer's view so we request
+ // a chunk from them right away.
+ assert_matches!(
+ overseer_recv(&mut virtual_overseer).await,
+ AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
+ to_peers,
+ payload,
+ )) => {
+ assert_eq!(to_peers, vec![test_state.validator_peer_id[2].clone()]);
+ assert_eq!(payload, awaiting_message(next_leaf.clone(), vec![pov_hash.clone()]));
+ }
+ );
+ });
+}
+
#[test]
fn distributes_to_those_awaiting_and_completes_local() {
let hash_a: Hash = [0; 32].into();
@@ -66,6 +556,7 @@ fn distributes_to_those_awaiting_and_completes_local() {
},
our_view: View(vec![hash_a, hash_b]),
metrics: Default::default(),
+ connection_requests: Default::default(),
};
let pool = sp_core::testing::TaskExecutor::new();
@@ -103,8 +594,10 @@ fn distributes_to_those_awaiting_and_completes_local() {
});
}
+
#[test]
fn we_inform_peers_with_same_view_we_are_awaiting() {
+
let hash_a: Hash = [0; 32].into();
let hash_b: Hash = [1; 32].into();
@@ -146,6 +639,7 @@ fn we_inform_peers_with_same_view_we_are_awaiting() {
},
our_view: View(vec![hash_a]),
metrics: Default::default(),
+ connection_requests: Default::default(),
};
let pool = sp_core::testing::TaskExecutor::new();
@@ -153,29 +647,135 @@ fn we_inform_peers_with_same_view_we_are_awaiting() {
let mut descriptor = CandidateDescriptor::default();
descriptor.pov_hash = pov_hash;
+ let para_id_1 = ParaId::from(1);
+ let para_id_2 = ParaId::from(2);
+
+ descriptor.para_id = para_id_1;
+
+ let availability_cores = vec![
+ CoreState::Scheduled(ScheduledCore {
+ para_id: para_id_1,
+ collator: None,
+ }),
+ CoreState::Scheduled(ScheduledCore {
+ para_id: para_id_2,
+ collator: None,
+ }),
+ ];
+
+ let validators = vec![
+ Sr25519Keyring::Alice,
+ Sr25519Keyring::Bob,
+ Sr25519Keyring::Charlie,
+ Sr25519Keyring::Dave,
+ Sr25519Keyring::Ferdie,
+ ];
+
+ let validator_authority_id = validator_authority_id(&validators);
+ let validators = validator_pubkeys(&validators);
+
+ let validator_peer_id: Vec<_> = std::iter::repeat_with(|| PeerId::random())
+ .take(validators.len())
+ .collect();
+
+ let validator_groups = vec![vec![2, 0, 4], vec![1], vec![3]];
+ let group_rotation_info = GroupRotationInfo {
+ session_start_block: 0,
+ group_rotation_frequency: 100,
+ now: 1,
+ };
+
+ let validator_groups = (validator_groups, group_rotation_info);
+
executor::block_on(async move {
- handle_fetch(
+ let handle_future = handle_fetch(
&mut state,
&mut ctx,
hash_a,
descriptor,
pov_send,
- ).await;
+ );
- assert_eq!(state.relay_parent_state[&hash_a].fetching[&pov_hash].len(), 1);
+ let check_future = async move {
+ //assert_eq!(state.relay_parent_state[&hash_a].fetching[&pov_hash].len(), 1);
+ assert_matches!(
+ handle.recv().await,
+ AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+ relay_parent,
+ RuntimeApiRequest::AvailabilityCores(tx)
+ )) => {
+ assert_eq!(relay_parent, hash_a);
+ tx.send(Ok(availability_cores)).unwrap();
+ }
+ );
- assert_matches!(
- handle.recv().await,
- AllMessages::NetworkBridge(
- NetworkBridgeMessage::SendValidationMessage(peers, message)
- ) => {
- assert_eq!(peers, vec![peer_a.clone()]);
- assert_eq!(
- message,
- awaiting_message(hash_a, vec![pov_hash]),
- );
- }
- )
+ assert_matches!(
+ handle.recv().await,
+ AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+ relay_parent,
+ RuntimeApiRequest::ValidatorGroups(tx)
+ )) => {
+ assert_eq!(relay_parent, hash_a);
+ tx.send(Ok(validator_groups)).unwrap();
+ }
+ );
+
+ assert_matches!(
+ handle.recv().await,
+ AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+ relay_parent,
+ RuntimeApiRequest::Validators(tx),
+ )) => {
+ assert_eq!(relay_parent, hash_a);
+ tx.send(Ok(validators.clone())).unwrap();
+ }
+ );
+
+ assert_matches!(
+ handle.recv().await,
+ AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+ relay_parent,
+ RuntimeApiRequest::ValidatorDiscovery(validators_res, tx),
+ )) => {
+ assert_eq!(relay_parent, hash_a);
+ assert_eq!(validators_res.len(), 3);
+ assert!(validators_res.iter().all(|v| validators.contains(&v)));
+
+ let result = vec![
+ Some(validator_authority_id[2].clone()),
+ Some(validator_authority_id[0].clone()),
+ Some(validator_authority_id[4].clone()),
+ ];
+
+ tx.send(Ok(result)).unwrap();
+ }
+ );
+
+ assert_matches!(
+ handle.recv().await,
+ AllMessages::NetworkBridge(
+ NetworkBridgeMessage::ConnectToValidators {
+ validator_ids,
+ mut connected,
+ ..
+ }
+ ) => {
+ assert_eq!(validator_ids.len(), 3);
+ assert!(validator_ids.iter().all(|id| validator_authority_id.contains(id)));
+
+ let result = vec![
+ (validator_authority_id[2].clone(), validator_peer_id[2].clone()),
+ (validator_authority_id[0].clone(), validator_peer_id[0].clone()),
+ (validator_authority_id[4].clone(), validator_peer_id[4].clone()),
+ ];
+
+ result.into_iter().for_each(|r| connected.try_send(r).unwrap());
+ }
+ );
+
+ };
+
+ futures::join!(handle_future, check_future);
});
}
@@ -224,6 +824,7 @@ fn peer_view_change_leads_to_us_informing() {
},
our_view: View(vec![hash_a]),
metrics: Default::default(),
+ connection_requests: Default::default(),
};
let pool = sp_core::testing::TaskExecutor::new();
@@ -296,6 +897,7 @@ fn peer_complete_fetch_and_is_rewarded() {
},
our_view: View(vec![hash_a]),
metrics: Default::default(),
+ connection_requests: Default::default(),
};
let pool = sp_core::testing::TaskExecutor::new();
@@ -385,6 +987,7 @@ fn peer_punished_for_sending_bad_pov() {
},
our_view: View(vec![hash_a]),
metrics: Default::default(),
+ connection_requests: Default::default(),
};
let pool = sp_core::testing::TaskExecutor::new();
@@ -449,6 +1052,7 @@ fn peer_punished_for_sending_unexpected_pov() {
},
our_view: View(vec![hash_a]),
metrics: Default::default(),
+ connection_requests: Default::default(),
};
let pool = sp_core::testing::TaskExecutor::new();
@@ -511,6 +1115,7 @@ fn peer_punished_for_sending_pov_out_of_our_view() {
},
our_view: View(vec![hash_a]),
metrics: Default::default(),
+ connection_requests: Default::default(),
};
let pool = sp_core::testing::TaskExecutor::new();
@@ -570,6 +1175,7 @@ fn peer_reported_for_awaiting_too_much() {
},
our_view: View(vec![hash_a]),
metrics: Default::default(),
+ connection_requests: Default::default(),
};
let pool = sp_core::testing::TaskExecutor::new();
@@ -656,6 +1262,7 @@ fn peer_reported_for_awaiting_outside_their_view() {
},
our_view: View(vec![hash_a, hash_b]),
metrics: Default::default(),
+ connection_requests: Default::default(),
};
let pool = sp_core::testing::TaskExecutor::new();
@@ -719,6 +1326,7 @@ fn peer_reported_for_awaiting_outside_our_view() {
},
our_view: View(vec![hash_a]),
metrics: Default::default(),
+ connection_requests: Default::default(),
};
let pool = sp_core::testing::TaskExecutor::new();
@@ -797,6 +1405,7 @@ fn peer_complete_fetch_leads_to_us_completing_others() {
},
our_view: View(vec![hash_a]),
metrics: Default::default(),
+ connection_requests: Default::default(),
};
let pool = sp_core::testing::TaskExecutor::new();
@@ -880,6 +1489,7 @@ fn peer_completing_request_no_longer_awaiting() {
},
our_view: View(vec![hash_a]),
metrics: Default::default(),
+ connection_requests: Default::default(),
};
let pool = sp_core::testing::TaskExecutor::new();
diff --git a/polkadot/node/subsystem-util/src/validator_discovery.rs b/polkadot/node/subsystem-util/src/validator_discovery.rs
index 83c405a3f4..357041d96b 100644
--- a/polkadot/node/subsystem-util/src/validator_discovery.rs
+++ b/polkadot/node/subsystem-util/src/validator_discovery.rs
@@ -115,6 +115,12 @@ pub struct ConnectionRequests {
requests: StreamUnordered,
}
+impl stream::FusedStream for ConnectionRequests {
+ fn is_terminated(&self) -> bool {
+ false
+ }
+}
+
impl ConnectionRequests {
/// Insert a new connection request.
///
@@ -133,6 +139,11 @@ impl ConnectionRequests {
Pin::new(&mut self.requests).remove(token);
}
}
+
+ /// Is a connection at this relay parent already present in the request
+ pub fn contains_request(&self, relay_parent: &Hash) -> bool {
+ self.id_map.contains_key(relay_parent)
+ }
}
impl stream::Stream for ConnectionRequests {