mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-13 19:51:05 +00:00
past-session validator discovery APIs (#2009)
* guide: fix formatting for SessionInfo module * primitives: SessionInfo type * punt on approval keys * ah, revert the type alias * session info runtime module skeleton * update the guide * runtime/configuration: sync with the guide * runtime/configuration: setters for newly added fields * runtime/configuration: set codec indexes * runtime/configuration: update test * primitives: fix SessionInfo definition * runtime/session_info: initial impl * runtime/session_info: use initializer for session handling (wip) * runtime/session_info: mock authority discovery trait * guide: update the initializer's order * runtime/session_info: tests skeleton * runtime/session_info: store n_delay_tranches in Configuration * runtime/session_info: punt on approval keys * runtime/session_info: add some basic tests * Update primitives/src/v1.rs * small fixes * remove codec index annotation on structs * fix off-by-one error * validator_discovery: accept a session index * runtime: replace validator_discovery api with session_info * Update runtime/parachains/src/session_info.rs Co-authored-by: Sergei Shulepov <sergei@parity.io> * runtime/session_info: add a comment about missing entries * runtime/session_info: define the keys * util: expose connect_to_past_session_validators * util: allow session_info requests for jobs * runtime-api: add mock test for session_info * collator-protocol: add session_index to test state * util: fix error message for runtime error * fix compilation * fix tests after merge with master Co-authored-by: Sergei Shulepov <sergei@parity.io>
This commit is contained in:
@@ -134,7 +134,7 @@ fn make_runtime_api_request<Client>(
|
||||
Request::CandidatePendingAvailability(para, sender) =>
|
||||
query!(candidate_pending_availability(para), sender),
|
||||
Request::CandidateEvents(sender) => query!(candidate_events(), sender),
|
||||
Request::ValidatorDiscovery(ids, sender) => query!(validator_discovery(ids), sender),
|
||||
Request::SessionInfo(index, sender) => query!(session_info(index), sender),
|
||||
Request::DmqContents(id, sender) => query!(dmq_contents(id), sender),
|
||||
Request::InboundHrmpChannelsContents(id, sender) => query!(inbound_hrmp_channels_contents(id), sender),
|
||||
}
|
||||
@@ -201,8 +201,8 @@ mod tests {
|
||||
use polkadot_primitives::v1::{
|
||||
ValidatorId, ValidatorIndex, GroupRotationInfo, CoreState, PersistedValidationData,
|
||||
Id as ParaId, OccupiedCoreAssumption, ValidationData, SessionIndex, ValidationCode,
|
||||
CommittedCandidateReceipt, CandidateEvent, AuthorityDiscoveryId, InboundDownwardMessage,
|
||||
BlockNumber, InboundHrmpMessage,
|
||||
CommittedCandidateReceipt, CandidateEvent, InboundDownwardMessage,
|
||||
BlockNumber, InboundHrmpMessage, SessionInfo,
|
||||
};
|
||||
use polkadot_node_subsystem_test_helpers as test_helpers;
|
||||
use sp_core::testing::TaskExecutor;
|
||||
@@ -216,6 +216,7 @@ mod tests {
|
||||
availability_cores: Vec<CoreState>,
|
||||
validation_data: HashMap<ParaId, ValidationData>,
|
||||
session_index_for_child: SessionIndex,
|
||||
session_info: HashMap<SessionIndex, SessionInfo>,
|
||||
validation_code: HashMap<ParaId, ValidationCode>,
|
||||
historical_validation_code: HashMap<ParaId, Vec<(BlockNumber, ValidationCode)>>,
|
||||
validation_outputs_results: HashMap<ParaId, bool>,
|
||||
@@ -289,6 +290,10 @@ mod tests {
|
||||
self.session_index_for_child.clone()
|
||||
}
|
||||
|
||||
fn session_info(&self, index: SessionIndex) -> Option<SessionInfo> {
|
||||
self.session_info.get(&index).cloned()
|
||||
}
|
||||
|
||||
fn validation_code(
|
||||
&self,
|
||||
para: ParaId,
|
||||
@@ -321,10 +326,6 @@ mod tests {
|
||||
self.candidate_events.clone()
|
||||
}
|
||||
|
||||
fn validator_discovery(ids: Vec<ValidatorId>) -> Vec<Option<AuthorityDiscoveryId>> {
|
||||
vec![None; ids.len()]
|
||||
}
|
||||
|
||||
fn dmq_contents(
|
||||
&self,
|
||||
recipient: ParaId,
|
||||
@@ -569,6 +570,33 @@ mod tests {
|
||||
futures::executor::block_on(future::join(subsystem_task, test_task));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn requests_session_info() {
|
||||
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
|
||||
let mut runtime_api = MockRuntimeApi::default();
|
||||
let session_index = 1;
|
||||
runtime_api.session_info.insert(session_index, Default::default());
|
||||
let runtime_api = Arc::new(runtime_api);
|
||||
|
||||
let relay_parent = [1; 32].into();
|
||||
|
||||
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
|
||||
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
|
||||
let test_task = async move {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
ctx_handle.send(FromOverseer::Communication {
|
||||
msg: RuntimeApiMessage::Request(relay_parent, Request::SessionInfo(session_index, tx))
|
||||
}).await;
|
||||
|
||||
assert_eq!(rx.await.unwrap().unwrap(), Some(Default::default()));
|
||||
|
||||
ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
|
||||
};
|
||||
|
||||
futures::executor::block_on(future::join(subsystem_task, test_task));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn requests_validation_code() {
|
||||
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
|
||||
|
||||
@@ -738,6 +738,7 @@ mod tests {
|
||||
use polkadot_primitives::v1::{
|
||||
BlockData, CandidateDescriptor, CollatorPair, ScheduledCore,
|
||||
ValidatorIndex, GroupRotationInfo, AuthorityDiscoveryId,
|
||||
SessionIndex, SessionInfo,
|
||||
};
|
||||
use polkadot_subsystem::{ActiveLeavesUpdate, messages::{RuntimeApiMessage, RuntimeApiRequest}};
|
||||
use polkadot_node_subsystem_util::TimeoutExt;
|
||||
@@ -776,6 +777,7 @@ mod tests {
|
||||
relay_parent: Hash,
|
||||
availability_core: CoreState,
|
||||
our_collator_pair: CollatorPair,
|
||||
session_index: SessionIndex,
|
||||
}
|
||||
|
||||
fn validator_pubkeys(val_ids: &[Sr25519Keyring]) -> Vec<ValidatorId> {
|
||||
@@ -832,6 +834,7 @@ mod tests {
|
||||
relay_parent,
|
||||
availability_core,
|
||||
our_collator_pair,
|
||||
session_index: 1,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -841,6 +844,10 @@ mod tests {
|
||||
&self.validator_groups.0[0]
|
||||
}
|
||||
|
||||
fn current_session_index(&self) -> SessionIndex {
|
||||
self.session_index
|
||||
}
|
||||
|
||||
fn current_group_validator_peer_ids(&self) -> Vec<PeerId> {
|
||||
self.current_group_validator_indices().iter().map(|i| self.validator_peer_id[*i as usize].clone()).collect()
|
||||
}
|
||||
@@ -870,20 +877,6 @@ mod tests {
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn next_group_validator_ids(&self) -> Vec<ValidatorId> {
|
||||
self.next_group_validator_indices()
|
||||
.iter()
|
||||
.map(|i| self.validator_public[*i as usize].clone())
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Returns the unique count of validators in the current and next group.
|
||||
fn current_and_next_group_unique_validator_count(&self) -> usize {
|
||||
let mut indices = self.next_group_validator_indices().iter().collect::<HashSet<_>>();
|
||||
indices.extend(self.current_group_validator_indices());
|
||||
indices.len()
|
||||
}
|
||||
|
||||
/// Generate a new relay parent and inform the subsystem about the new view.
|
||||
///
|
||||
/// If `merge_views == true` it means the subsystem will be informed that we working on the old `relay_parent`
|
||||
@@ -1090,20 +1083,33 @@ mod tests {
|
||||
overseer_recv(virtual_overseer).await,
|
||||
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
relay_parent,
|
||||
RuntimeApiRequest::ValidatorDiscovery(validators, tx),
|
||||
RuntimeApiRequest::SessionIndexForChild(tx),
|
||||
)) => {
|
||||
assert_eq!(relay_parent, test_state.relay_parent);
|
||||
assert_eq!(validators.len(), test_state.current_and_next_group_unique_validator_count());
|
||||
tx.send(Ok(test_state.current_session_index())).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
let current_validators = test_state.current_group_validator_ids();
|
||||
let next_validators = test_state.next_group_validator_ids();
|
||||
assert_matches!(
|
||||
overseer_recv(virtual_overseer).await,
|
||||
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
relay_parent,
|
||||
RuntimeApiRequest::SessionInfo(index, tx),
|
||||
)) => {
|
||||
assert_eq!(relay_parent, test_state.relay_parent);
|
||||
assert_eq!(index, test_state.current_session_index());
|
||||
|
||||
assert!(validators.iter().all(|v| current_validators.contains(&v) || next_validators.contains(&v)));
|
||||
let validators = test_state.current_group_validator_ids();
|
||||
let current_discovery_keys = test_state.current_group_validator_authority_ids();
|
||||
let next_discovery_keys = test_state.next_group_validator_authority_ids();
|
||||
|
||||
let current_validators = test_state.current_group_validator_authority_ids();
|
||||
let next_validators = test_state.next_group_validator_authority_ids();
|
||||
let discovery_keys = [¤t_discovery_keys[..], &next_discovery_keys[..]].concat();
|
||||
|
||||
tx.send(Ok(current_validators.into_iter().chain(next_validators).map(Some).collect())).unwrap();
|
||||
tx.send(Ok(Some(SessionInfo {
|
||||
validators,
|
||||
discovery_keys,
|
||||
..Default::default()
|
||||
}))).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
|
||||
@@ -60,16 +60,6 @@ enum Error {
|
||||
Prometheus(#[from] prometheus::PrometheusError),
|
||||
}
|
||||
|
||||
impl From<util::validator_discovery::Error> for Error {
|
||||
fn from(me: util::validator_discovery::Error) -> Self {
|
||||
match me {
|
||||
util::validator_discovery::Error::Subsystem(s) => Error::Subsystem(s),
|
||||
util::validator_discovery::Error::RuntimeApi(ra) => Error::RuntimeApi(ra),
|
||||
util::validator_discovery::Error::Oneshot(c) => Error::Oneshot(c),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
/// What side of the collator protocol is being engaged
|
||||
|
||||
@@ -27,8 +27,6 @@ pub enum Error {
|
||||
#[error(transparent)]
|
||||
Runtime(#[from] polkadot_subsystem::errors::RuntimeApiError),
|
||||
#[error(transparent)]
|
||||
ValidatorDiscovery(#[from] polkadot_node_subsystem_util::validator_discovery::Error),
|
||||
#[error(transparent)]
|
||||
Util(#[from] polkadot_node_subsystem_util::Error),
|
||||
}
|
||||
|
||||
|
||||
@@ -11,7 +11,7 @@ use sp_keyring::Sr25519Keyring;
|
||||
|
||||
use polkadot_primitives::v1::{
|
||||
AuthorityDiscoveryId, BlockData, CoreState, GroupRotationInfo, Id as ParaId,
|
||||
ScheduledCore, ValidatorIndex,
|
||||
ScheduledCore, ValidatorIndex, SessionIndex, SessionInfo,
|
||||
};
|
||||
use polkadot_subsystem::messages::{RuntimeApiMessage, RuntimeApiRequest};
|
||||
use polkadot_node_subsystem_test_helpers as test_helpers;
|
||||
@@ -37,8 +37,10 @@ fn validator_authority_id(val_ids: &[Sr25519Keyring]) -> Vec<AuthorityDiscoveryI
|
||||
val_ids.iter().map(|v| v.public().into()).collect()
|
||||
}
|
||||
|
||||
type VirtualOverseer = test_helpers::TestSubsystemContextHandle<PoVDistributionMessage>;
|
||||
|
||||
struct TestHarness {
|
||||
virtual_overseer: test_helpers::TestSubsystemContextHandle<PoVDistributionMessage>,
|
||||
virtual_overseer: VirtualOverseer,
|
||||
}
|
||||
|
||||
fn test_harness<T: Future<Output = ()>>(
|
||||
@@ -75,7 +77,7 @@ fn test_harness<T: Future<Output = ()>>(
|
||||
const TIMEOUT: Duration = Duration::from_millis(100);
|
||||
|
||||
async fn overseer_send(
|
||||
overseer: &mut test_helpers::TestSubsystemContextHandle<PoVDistributionMessage>,
|
||||
overseer: &mut VirtualOverseer,
|
||||
msg: PoVDistributionMessage,
|
||||
) {
|
||||
trace!("Sending message:\n{:?}", &msg);
|
||||
@@ -87,7 +89,7 @@ async fn overseer_send(
|
||||
}
|
||||
|
||||
async fn overseer_recv(
|
||||
overseer: &mut test_helpers::TestSubsystemContextHandle<PoVDistributionMessage>,
|
||||
overseer: &mut VirtualOverseer,
|
||||
) -> AllMessages {
|
||||
let msg = overseer_recv_with_timeout(overseer, TIMEOUT)
|
||||
.await
|
||||
@@ -99,7 +101,7 @@ async fn overseer_recv(
|
||||
}
|
||||
|
||||
async fn overseer_recv_with_timeout(
|
||||
overseer: &mut test_helpers::TestSubsystemContextHandle<PoVDistributionMessage>,
|
||||
overseer: &mut VirtualOverseer,
|
||||
timeout: Duration,
|
||||
) -> Option<AllMessages> {
|
||||
trace!("Waiting for message...");
|
||||
@@ -110,7 +112,7 @@ async fn overseer_recv_with_timeout(
|
||||
}
|
||||
|
||||
async fn overseer_signal(
|
||||
overseer: &mut test_helpers::TestSubsystemContextHandle<PoVDistributionMessage>,
|
||||
overseer: &mut VirtualOverseer,
|
||||
signal: OverseerSignal,
|
||||
) {
|
||||
overseer
|
||||
@@ -130,6 +132,7 @@ struct TestState {
|
||||
validator_groups: (Vec<Vec<ValidatorIndex>>, GroupRotationInfo),
|
||||
relay_parent: Hash,
|
||||
availability_cores: Vec<CoreState>,
|
||||
session_index: SessionIndex,
|
||||
}
|
||||
|
||||
impl Default for TestState {
|
||||
@@ -184,10 +187,56 @@ impl Default for TestState {
|
||||
validator_groups,
|
||||
relay_parent,
|
||||
availability_cores,
|
||||
session_index: 1,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn test_validator_discovery(
|
||||
virtual_overseer: &mut VirtualOverseer,
|
||||
expected_relay_parent: Hash,
|
||||
session_index: SessionIndex,
|
||||
validator_ids: &[ValidatorId],
|
||||
discovery_ids: &[AuthorityDiscoveryId],
|
||||
validator_group: &[ValidatorIndex],
|
||||
) {
|
||||
assert_matches!(
|
||||
overseer_recv(virtual_overseer).await,
|
||||
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
relay_parent,
|
||||
RuntimeApiRequest::SessionIndexForChild(tx),
|
||||
)) => {
|
||||
assert_eq!(relay_parent, expected_relay_parent);
|
||||
tx.send(Ok(session_index)).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
assert_matches!(
|
||||
overseer_recv(virtual_overseer).await,
|
||||
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
relay_parent,
|
||||
RuntimeApiRequest::SessionInfo(index, tx),
|
||||
)) => {
|
||||
assert_eq!(relay_parent, expected_relay_parent);
|
||||
assert_eq!(index, session_index);
|
||||
|
||||
let validators = validator_group.iter()
|
||||
.map(|idx| validator_ids[*idx as usize].clone())
|
||||
.collect();
|
||||
|
||||
let discovery_keys = validator_group.iter()
|
||||
.map(|idx| discovery_ids[*idx as usize].clone())
|
||||
.collect();
|
||||
|
||||
tx.send(Ok(Some(SessionInfo {
|
||||
validators,
|
||||
discovery_keys,
|
||||
..Default::default()
|
||||
}))).unwrap();
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ask_validators_for_povs() {
|
||||
let test_state = TestState::default();
|
||||
@@ -271,25 +320,14 @@ fn ask_validators_for_povs() {
|
||||
}
|
||||
);
|
||||
|
||||
// obtain the validator_id to authority_id mapping
|
||||
assert_matches!(
|
||||
overseer_recv(&mut virtual_overseer).await,
|
||||
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
relay_parent,
|
||||
RuntimeApiRequest::ValidatorDiscovery(validators, tx),
|
||||
)) => {
|
||||
assert_eq!(relay_parent, current);
|
||||
assert_eq!(validators.len(), 3);
|
||||
assert!(validators.iter().all(|v| test_state.validator_public.contains(&v)));
|
||||
|
||||
let result = vec![
|
||||
Some(test_state.validator_authority_id[2].clone()),
|
||||
Some(test_state.validator_authority_id[0].clone()),
|
||||
Some(test_state.validator_authority_id[4].clone()),
|
||||
];
|
||||
tx.send(Ok(result)).unwrap();
|
||||
}
|
||||
);
|
||||
test_validator_discovery(
|
||||
&mut virtual_overseer,
|
||||
current,
|
||||
test_state.session_index,
|
||||
&test_state.validator_public,
|
||||
&test_state.validator_authority_id,
|
||||
&test_state.validator_groups.0[0],
|
||||
).await;
|
||||
|
||||
// We now should connect to our validator group.
|
||||
assert_matches!(
|
||||
@@ -448,24 +486,14 @@ fn ask_validators_for_povs() {
|
||||
);
|
||||
|
||||
// obtain the validator_id to authority_id mapping
|
||||
assert_matches!(
|
||||
overseer_recv(&mut virtual_overseer).await,
|
||||
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
relay_parent,
|
||||
RuntimeApiRequest::ValidatorDiscovery(validators, tx),
|
||||
)) => {
|
||||
assert_eq!(relay_parent, next_leaf);
|
||||
assert_eq!(validators.len(), 3);
|
||||
assert!(validators.iter().all(|v| test_state.validator_public.contains(&v)));
|
||||
|
||||
let result = vec![
|
||||
Some(test_state.validator_authority_id[2].clone()),
|
||||
Some(test_state.validator_authority_id[0].clone()),
|
||||
Some(test_state.validator_authority_id[4].clone()),
|
||||
];
|
||||
tx.send(Ok(result)).unwrap();
|
||||
}
|
||||
);
|
||||
test_validator_discovery(
|
||||
&mut virtual_overseer,
|
||||
next_leaf,
|
||||
test_state.session_index,
|
||||
&test_state.validator_public,
|
||||
&test_state.validator_authority_id,
|
||||
&test_state.validator_groups.0[0],
|
||||
).await;
|
||||
|
||||
// We now should connect to our validator group.
|
||||
assert_matches!(
|
||||
@@ -716,7 +744,7 @@ fn we_inform_peers_with_same_view_we_are_awaiting() {
|
||||
RuntimeApiRequest::ValidatorGroups(tx)
|
||||
)) => {
|
||||
assert_eq!(relay_parent, hash_a);
|
||||
tx.send(Ok(validator_groups)).unwrap();
|
||||
tx.send(Ok(validator_groups.clone())).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
@@ -731,25 +759,14 @@ fn we_inform_peers_with_same_view_we_are_awaiting() {
|
||||
}
|
||||
);
|
||||
|
||||
assert_matches!(
|
||||
handle.recv().await,
|
||||
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
relay_parent,
|
||||
RuntimeApiRequest::ValidatorDiscovery(validators_res, tx),
|
||||
)) => {
|
||||
assert_eq!(relay_parent, hash_a);
|
||||
assert_eq!(validators_res.len(), 3);
|
||||
assert!(validators_res.iter().all(|v| validators.contains(&v)));
|
||||
|
||||
let result = vec![
|
||||
Some(validator_authority_id[2].clone()),
|
||||
Some(validator_authority_id[0].clone()),
|
||||
Some(validator_authority_id[4].clone()),
|
||||
];
|
||||
|
||||
tx.send(Ok(result)).unwrap();
|
||||
}
|
||||
);
|
||||
test_validator_discovery(
|
||||
&mut handle,
|
||||
hash_a,
|
||||
1,
|
||||
&validators,
|
||||
&validator_authority_id,
|
||||
&validator_groups.0[0],
|
||||
).await;
|
||||
|
||||
assert_matches!(
|
||||
handle.recv().await,
|
||||
|
||||
@@ -37,6 +37,7 @@ use polkadot_primitives::v1::{
|
||||
CandidateEvent, CommittedCandidateReceipt, CoreState, EncodeAs, PersistedValidationData,
|
||||
GroupRotationInfo, Hash, Id as ParaId, ValidationData, OccupiedCoreAssumption,
|
||||
SessionIndex, Signed, SigningContext, ValidationCode, ValidatorId, ValidatorIndex,
|
||||
SessionInfo,
|
||||
};
|
||||
use sp_core::{
|
||||
traits::SpawnNamed,
|
||||
@@ -193,6 +194,7 @@ specialize_requests! {
|
||||
fn request_validation_code(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<ValidationCode>; ValidationCode;
|
||||
fn request_candidate_pending_availability(para_id: ParaId) -> Option<CommittedCandidateReceipt>; CandidatePendingAvailability;
|
||||
fn request_candidate_events() -> Vec<CandidateEvent>; CandidateEvents;
|
||||
fn request_session_info(index: SessionIndex) -> Option<SessionInfo>; SessionInfo;
|
||||
}
|
||||
|
||||
/// Request some data from the `RuntimeApi` via a SubsystemContext.
|
||||
@@ -274,6 +276,7 @@ specialize_requests_ctx! {
|
||||
fn request_validation_code_ctx(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<ValidationCode>; ValidationCode;
|
||||
fn request_candidate_pending_availability_ctx(para_id: ParaId) -> Option<CommittedCandidateReceipt>; CandidatePendingAvailability;
|
||||
fn request_candidate_events_ctx() -> Vec<CandidateEvent>; CandidateEvents;
|
||||
fn request_session_info_ctx(index: SessionIndex) -> Option<SessionInfo>; SessionInfo;
|
||||
}
|
||||
|
||||
/// From the given set of validators, find the first key we can sign with, if any.
|
||||
|
||||
@@ -20,34 +20,20 @@ use std::collections::HashMap;
|
||||
use std::pin::Pin;
|
||||
|
||||
use futures::{
|
||||
channel::{mpsc, oneshot},
|
||||
channel::mpsc,
|
||||
task::{Poll, self},
|
||||
stream,
|
||||
};
|
||||
use streamunordered::{StreamUnordered, StreamYield};
|
||||
use thiserror::Error;
|
||||
|
||||
use polkadot_node_subsystem::{
|
||||
errors::RuntimeApiError, SubsystemError,
|
||||
messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest, NetworkBridgeMessage},
|
||||
errors::RuntimeApiError,
|
||||
messages::{AllMessages, NetworkBridgeMessage},
|
||||
SubsystemContext,
|
||||
};
|
||||
use polkadot_primitives::v1::{Hash, ValidatorId, AuthorityDiscoveryId};
|
||||
use polkadot_primitives::v1::{Hash, ValidatorId, AuthorityDiscoveryId, SessionIndex};
|
||||
use sc_network::PeerId;
|
||||
|
||||
/// Error when making a request to connect to validators.
|
||||
#[derive(Debug, Error)]
|
||||
pub enum Error {
|
||||
/// Attempted to send or receive on a oneshot channel which had been canceled
|
||||
#[error(transparent)]
|
||||
Oneshot(#[from] oneshot::Canceled),
|
||||
/// A subsystem error.
|
||||
#[error(transparent)]
|
||||
Subsystem(#[from] SubsystemError),
|
||||
/// An error in the Runtime API.
|
||||
#[error(transparent)]
|
||||
RuntimeApi(#[from] RuntimeApiError),
|
||||
}
|
||||
use crate::Error;
|
||||
|
||||
/// Utility function to make it easier to connect to validators.
|
||||
pub async fn connect_to_validators<Context: SubsystemContext>(
|
||||
@@ -55,17 +41,42 @@ pub async fn connect_to_validators<Context: SubsystemContext>(
|
||||
relay_parent: Hash,
|
||||
validators: Vec<ValidatorId>,
|
||||
) -> Result<ConnectionRequest, Error> {
|
||||
// ValidatorId -> AuthorityDiscoveryId
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let current_index = crate::request_session_index_for_child_ctx(relay_parent, ctx).await?.await??;
|
||||
connect_to_past_session_validators(ctx, relay_parent, validators, current_index).await
|
||||
}
|
||||
|
||||
ctx.send_message(AllMessages::RuntimeApi(
|
||||
RuntimeApiMessage::Request(
|
||||
relay_parent,
|
||||
RuntimeApiRequest::ValidatorDiscovery(validators.clone(), tx),
|
||||
)
|
||||
)).await;
|
||||
/// Utility function to make it easier to connect to validators in the past sessions.
|
||||
pub async fn connect_to_past_session_validators<Context: SubsystemContext>(
|
||||
ctx: &mut Context,
|
||||
relay_parent: Hash,
|
||||
validators: Vec<ValidatorId>,
|
||||
session_index: SessionIndex,
|
||||
) -> Result<ConnectionRequest, Error> {
|
||||
let session_info = crate::request_session_info_ctx(
|
||||
relay_parent,
|
||||
session_index,
|
||||
ctx,
|
||||
).await?.await??;
|
||||
|
||||
let (session_validators, discovery_keys) = match session_info {
|
||||
Some(info) => (info.validators, info.discovery_keys),
|
||||
None => return Err(RuntimeApiError::from(
|
||||
format!("No SessionInfo found for the index {}", session_index)
|
||||
).into()),
|
||||
};
|
||||
|
||||
let id_to_index = session_validators.iter()
|
||||
.zip(0usize..)
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
// We assume the same ordering in authorities as in validators so we can do an index search
|
||||
let maybe_authorities: Vec<_> = validators.iter()
|
||||
.map(|id| {
|
||||
let validator_index = id_to_index.get(&id);
|
||||
validator_index.and_then(|i| discovery_keys.get(*i).cloned())
|
||||
})
|
||||
.collect();
|
||||
|
||||
let maybe_authorities = rx.await??;
|
||||
let authorities: Vec<_> = maybe_authorities.iter()
|
||||
.cloned()
|
||||
.filter_map(|id| id)
|
||||
|
||||
@@ -31,7 +31,7 @@ use polkadot_node_primitives::{
|
||||
CollationGenerationConfig, MisbehaviorReport, SignedFullStatement, ValidationResult,
|
||||
};
|
||||
use polkadot_primitives::v1::{
|
||||
AuthorityDiscoveryId, AvailableData, BackedCandidate, BlockNumber,
|
||||
AuthorityDiscoveryId, AvailableData, BackedCandidate, BlockNumber, SessionInfo,
|
||||
Header as BlockHeader, CandidateDescriptor, CandidateEvent, CandidateReceipt,
|
||||
CollatorId, CommittedCandidateReceipt, CoreState, ErasureChunk,
|
||||
GroupRotationInfo, Hash, Id as ParaId, OccupiedCoreAssumption,
|
||||
@@ -434,14 +434,8 @@ pub enum RuntimeApiRequest {
|
||||
/// Get all events concerning candidates (backing, inclusion, time-out) in the parent of
|
||||
/// the block in whose state this request is executed.
|
||||
CandidateEvents(RuntimeApiSender<Vec<CandidateEvent>>),
|
||||
/// Get the `AuthorityDiscoveryId`s corresponding to the given `ValidatorId`s.
|
||||
/// Currently this request is limited to validators in the current session.
|
||||
///
|
||||
/// Returns `None` for validators not found in the current session.
|
||||
ValidatorDiscovery(
|
||||
Vec<ValidatorId>,
|
||||
RuntimeApiSender<Vec<Option<AuthorityDiscoveryId>>>,
|
||||
),
|
||||
/// Get the session info for the given session, if stored.
|
||||
SessionInfo(SessionIndex, RuntimeApiSender<Option<SessionInfo>>),
|
||||
/// Get all the pending inbound messages in the downward message queue for a para.
|
||||
DmqContents(
|
||||
ParaId,
|
||||
|
||||
Reference in New Issue
Block a user