mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-09 20:11:09 +00:00
gossip: move authorities request to runtime api subsystem (#2798)
This commit is contained in:
Generated
+1
-2
@@ -5636,9 +5636,7 @@ dependencies = [
|
||||
"polkadot-node-subsystem",
|
||||
"polkadot-node-subsystem-util",
|
||||
"polkadot-primitives",
|
||||
"sp-api",
|
||||
"sp-application-crypto",
|
||||
"sp-authority-discovery",
|
||||
"sp-keystore",
|
||||
"tracing",
|
||||
]
|
||||
@@ -5890,6 +5888,7 @@ dependencies = [
|
||||
"polkadot-node-subsystem-util",
|
||||
"polkadot-primitives",
|
||||
"sp-api",
|
||||
"sp-authority-discovery",
|
||||
"sp-consensus-babe",
|
||||
"sp-core",
|
||||
"tracing",
|
||||
|
||||
@@ -11,6 +11,7 @@ memory-lru = "0.1.0"
|
||||
parity-util-mem = { version = "0.9.0", default-features = false }
|
||||
|
||||
sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
sp-authority-discovery = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
sp-consensus-babe = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
|
||||
|
||||
@@ -19,6 +19,7 @@ use polkadot_primitives::v1::{
|
||||
CoreState, GroupRotationInfo, InboundDownwardMessage, InboundHrmpMessage, Hash,
|
||||
PersistedValidationData, Id as ParaId, OccupiedCoreAssumption,
|
||||
SessionIndex, SessionInfo, ValidationCode, ValidatorId, ValidatorIndex,
|
||||
AuthorityDiscoveryId,
|
||||
};
|
||||
use sp_consensus_babe::Epoch;
|
||||
use parity_util_mem::{MallocSizeOf, MallocSizeOfExt};
|
||||
@@ -28,6 +29,7 @@ use memory_lru::{MemoryLruCache, ResidentSize};
|
||||
|
||||
use std::collections::btree_map::BTreeMap;
|
||||
|
||||
const AUTHORITIES_CACHE_SIZE: usize = 128 * 1024;
|
||||
const VALIDATORS_CACHE_SIZE: usize = 64 * 1024;
|
||||
const VALIDATOR_GROUPS_CACHE_SIZE: usize = 64 * 1024;
|
||||
const AVAILABILITY_CORES_CACHE_SIZE: usize = 64 * 1024;
|
||||
@@ -59,7 +61,18 @@ impl<T> ResidentSize for DoesNotAllocate<T> {
|
||||
}
|
||||
}
|
||||
|
||||
// this is an ugly workaround for `AuthorityDiscoveryId`
|
||||
// not implementing `MallocSizeOf`
|
||||
struct VecOfDoesNotAllocate<T>(Vec<T>);
|
||||
|
||||
impl<T> ResidentSize for VecOfDoesNotAllocate<T> {
|
||||
fn resident_size(&self) -> usize {
|
||||
std::mem::size_of::<T>() * self.0.capacity()
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct RequestResultCache {
|
||||
authorities: MemoryLruCache<Hash, VecOfDoesNotAllocate<AuthorityDiscoveryId>>,
|
||||
validators: MemoryLruCache<Hash, ResidentSizeOf<Vec<ValidatorId>>>,
|
||||
validator_groups: MemoryLruCache<Hash, ResidentSizeOf<(Vec<Vec<ValidatorIndex>>, GroupRotationInfo)>>,
|
||||
availability_cores: MemoryLruCache<Hash, ResidentSizeOf<Vec<CoreState>>>,
|
||||
@@ -79,6 +92,7 @@ pub(crate) struct RequestResultCache {
|
||||
impl Default for RequestResultCache {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
authorities: MemoryLruCache::new(AUTHORITIES_CACHE_SIZE),
|
||||
validators: MemoryLruCache::new(VALIDATORS_CACHE_SIZE),
|
||||
validator_groups: MemoryLruCache::new(VALIDATOR_GROUPS_CACHE_SIZE),
|
||||
availability_cores: MemoryLruCache::new(AVAILABILITY_CORES_CACHE_SIZE),
|
||||
@@ -98,6 +112,14 @@ impl Default for RequestResultCache {
|
||||
}
|
||||
|
||||
impl RequestResultCache {
|
||||
pub(crate) fn authorities(&mut self, relay_parent: &Hash) -> Option<&Vec<AuthorityDiscoveryId>> {
|
||||
self.authorities.get(relay_parent).map(|v| &v.0)
|
||||
}
|
||||
|
||||
pub(crate) fn cache_authorities(&mut self, relay_parent: Hash, authorities: Vec<AuthorityDiscoveryId>) {
|
||||
self.authorities.insert(relay_parent, VecOfDoesNotAllocate(authorities));
|
||||
}
|
||||
|
||||
pub(crate) fn validators(&mut self, relay_parent: &Hash) -> Option<&Vec<ValidatorId>> {
|
||||
self.validators.get(relay_parent).map(|v| &v.0)
|
||||
}
|
||||
@@ -212,6 +234,7 @@ impl RequestResultCache {
|
||||
}
|
||||
|
||||
pub(crate) enum RequestResult {
|
||||
Authorities(Hash, Vec<AuthorityDiscoveryId>),
|
||||
Validators(Hash, Vec<ValidatorId>),
|
||||
ValidatorGroups(Hash, (Vec<Vec<ValidatorIndex>>, GroupRotationInfo)),
|
||||
AvailabilityCores(Hash, Vec<CoreState>),
|
||||
|
||||
@@ -34,6 +34,7 @@ use polkadot_node_subsystem_util::metrics::{self, prometheus};
|
||||
use polkadot_primitives::v1::{Block, BlockId, Hash, ParachainHost};
|
||||
|
||||
use sp_api::ProvideRuntimeApi;
|
||||
use sp_authority_discovery::AuthorityDiscoveryApi;
|
||||
use sp_core::traits::SpawnNamed;
|
||||
use sp_consensus_babe::BabeApi;
|
||||
|
||||
@@ -83,7 +84,7 @@ impl<Client> RuntimeApiSubsystem<Client> {
|
||||
|
||||
impl<Client, Context> Subsystem<Context> for RuntimeApiSubsystem<Client> where
|
||||
Client: ProvideRuntimeApi<Block> + Send + 'static + Sync,
|
||||
Client::Api: ParachainHost<Block> + BabeApi<Block>,
|
||||
Client::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
|
||||
Context: SubsystemContext<Message = RuntimeApiMessage>
|
||||
{
|
||||
fn start(self, ctx: Context) -> SpawnedSubsystem {
|
||||
@@ -96,12 +97,14 @@ impl<Client, Context> Subsystem<Context> for RuntimeApiSubsystem<Client> where
|
||||
|
||||
impl<Client> RuntimeApiSubsystem<Client> where
|
||||
Client: ProvideRuntimeApi<Block> + Send + 'static + Sync,
|
||||
Client::Api: ParachainHost<Block> + BabeApi<Block>,
|
||||
Client::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
|
||||
{
|
||||
fn store_cache(&mut self, result: RequestResult) {
|
||||
use RequestResult::*;
|
||||
|
||||
match result {
|
||||
Authorities(relay_parent, authorities) =>
|
||||
self.requests_cache.cache_authorities(relay_parent, authorities),
|
||||
Validators(relay_parent, validators) =>
|
||||
self.requests_cache.cache_validators(relay_parent, validators),
|
||||
ValidatorGroups(relay_parent, groups) =>
|
||||
@@ -160,6 +163,8 @@ impl<Client> RuntimeApiSubsystem<Client> where
|
||||
}
|
||||
|
||||
match request {
|
||||
Request::Authorities(sender) => query!(authorities(), sender)
|
||||
.map(|sender| Request::Authorities(sender)),
|
||||
Request::Validators(sender) => query!(validators(), sender)
|
||||
.map(|sender| Request::Validators(sender)),
|
||||
Request::ValidatorGroups(sender) => query!(validator_groups(), sender)
|
||||
@@ -263,7 +268,7 @@ async fn run<Client>(
|
||||
mut subsystem: RuntimeApiSubsystem<Client>,
|
||||
) -> SubsystemResult<()> where
|
||||
Client: ProvideRuntimeApi<Block> + Send + Sync + 'static,
|
||||
Client::Api: ParachainHost<Block> + BabeApi<Block>,
|
||||
Client::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
|
||||
{
|
||||
loop {
|
||||
select! {
|
||||
@@ -291,7 +296,7 @@ fn make_runtime_api_request<Client>(
|
||||
) -> Option<RequestResult>
|
||||
where
|
||||
Client: ProvideRuntimeApi<Block>,
|
||||
Client::Api: ParachainHost<Block> + BabeApi<Block>,
|
||||
Client::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
|
||||
{
|
||||
let _timer = metrics.time_make_runtime_api_request();
|
||||
|
||||
@@ -327,6 +332,7 @@ where
|
||||
}
|
||||
|
||||
match request {
|
||||
Request::Authorities(sender) => query!(Authorities, authorities(), sender),
|
||||
Request::Validators(sender) => query!(Validators, validators(), sender),
|
||||
Request::ValidatorGroups(sender) => query!(ValidatorGroups, validator_groups(), sender),
|
||||
Request::AvailabilityCores(sender) => query!(AvailabilityCores, availability_cores(), sender),
|
||||
@@ -416,7 +422,7 @@ mod tests {
|
||||
ValidatorId, ValidatorIndex, GroupRotationInfo, CoreState, PersistedValidationData,
|
||||
Id as ParaId, OccupiedCoreAssumption, SessionIndex, ValidationCode,
|
||||
CommittedCandidateReceipt, CandidateEvent, InboundDownwardMessage,
|
||||
BlockNumber, InboundHrmpMessage, SessionInfo,
|
||||
BlockNumber, InboundHrmpMessage, SessionInfo, AuthorityDiscoveryId,
|
||||
};
|
||||
use polkadot_node_subsystem_test_helpers as test_helpers;
|
||||
use sp_core::testing::TaskExecutor;
|
||||
@@ -428,6 +434,7 @@ mod tests {
|
||||
|
||||
#[derive(Default, Clone)]
|
||||
struct MockRuntimeApi {
|
||||
authorities: Vec<AuthorityDiscoveryId>,
|
||||
validators: Vec<ValidatorId>,
|
||||
validator_groups: Vec<Vec<ValidatorIndex>>,
|
||||
availability_cores: Vec<CoreState>,
|
||||
@@ -582,6 +589,36 @@ mod tests {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
impl AuthorityDiscoveryApi<Block> for MockRuntimeApi {
|
||||
fn authorities(&self) -> Vec<AuthorityDiscoveryId> {
|
||||
self.authorities.clone()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn requests_authorities() {
|
||||
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
|
||||
let runtime_api = Arc::new(MockRuntimeApi::default());
|
||||
let relay_parent = [1; 32].into();
|
||||
let spawner = sp_core::testing::TaskExecutor::new();
|
||||
|
||||
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
|
||||
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
|
||||
let test_task = async move {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
ctx_handle.send(FromOverseer::Communication {
|
||||
msg: RuntimeApiMessage::Request(relay_parent, Request::Authorities(tx))
|
||||
}).await;
|
||||
|
||||
assert_eq!(rx.await.unwrap().unwrap(), runtime_api.authorities);
|
||||
|
||||
ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
|
||||
};
|
||||
|
||||
futures::executor::block_on(future::join(subsystem_task, test_task));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -5,10 +5,8 @@ authors = ["Parity Technologies <admin@parity.io>"]
|
||||
edition = "2018"
|
||||
|
||||
[dependencies]
|
||||
sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
sp-authority-discovery = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
|
||||
polkadot-node-network-protocol = { path = "../protocol" }
|
||||
polkadot-node-subsystem = { path = "../../subsystem" }
|
||||
|
||||
@@ -19,9 +19,6 @@
|
||||
//! the gossiping subsystems on every new session.
|
||||
|
||||
use futures::{channel::mpsc, FutureExt as _};
|
||||
use std::sync::Arc;
|
||||
use sp_api::ProvideRuntimeApi;
|
||||
use sp_authority_discovery::AuthorityDiscoveryApi;
|
||||
use polkadot_node_subsystem::{
|
||||
messages::{
|
||||
GossipSupportMessage,
|
||||
@@ -34,7 +31,7 @@ use polkadot_node_subsystem_util::{
|
||||
self as util,
|
||||
};
|
||||
use polkadot_primitives::v1::{
|
||||
Hash, SessionIndex, AuthorityDiscoveryId, Block, BlockId,
|
||||
Hash, SessionIndex, AuthorityDiscoveryId,
|
||||
};
|
||||
use polkadot_node_network_protocol::{peer_set::PeerSet, PeerId};
|
||||
use sp_keystore::{CryptoStore, SyncCryptoStorePtr};
|
||||
@@ -43,8 +40,7 @@ use sp_application_crypto::{Public, AppKey};
|
||||
const LOG_TARGET: &str = "parachain::gossip-support";
|
||||
|
||||
/// The Gossip Support subsystem.
|
||||
pub struct GossipSupport<Client> {
|
||||
client: Arc<Client>,
|
||||
pub struct GossipSupport {
|
||||
keystore: SyncCryptoStorePtr,
|
||||
}
|
||||
|
||||
@@ -55,15 +51,10 @@ struct State {
|
||||
_last_connection_request: Option<mpsc::Receiver<(AuthorityDiscoveryId, PeerId)>>,
|
||||
}
|
||||
|
||||
impl<Client> GossipSupport<Client>
|
||||
where
|
||||
Client: ProvideRuntimeApi<Block>,
|
||||
Client::Api: AuthorityDiscoveryApi<Block>,
|
||||
{
|
||||
impl GossipSupport {
|
||||
/// Create a new instance of the [`GossipSupport`] subsystem.
|
||||
pub fn new(keystore: SyncCryptoStorePtr, client: Arc<Client>) -> Self {
|
||||
pub fn new(keystore: SyncCryptoStorePtr) -> Self {
|
||||
Self {
|
||||
client,
|
||||
keystore,
|
||||
}
|
||||
}
|
||||
@@ -74,7 +65,7 @@ where
|
||||
Context: SubsystemContext<Message = GossipSupportMessage>,
|
||||
{
|
||||
let mut state = State::default();
|
||||
let Self { client, keystore } = self;
|
||||
let Self { keystore } = self;
|
||||
loop {
|
||||
let message = match ctx.recv().await {
|
||||
Ok(message) => message,
|
||||
@@ -96,7 +87,7 @@ where
|
||||
tracing::trace!(target: LOG_TARGET, "active leaves signal");
|
||||
|
||||
let leaves = activated.into_iter().map(|a| a.hash);
|
||||
if let Err(e) = state.handle_active_leaves(&mut ctx, client.clone(), &keystore, leaves).await {
|
||||
if let Err(e) = state.handle_active_leaves(&mut ctx, &keystore, leaves).await {
|
||||
tracing::debug!(target: LOG_TARGET, error = ?e);
|
||||
}
|
||||
}
|
||||
@@ -109,18 +100,12 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
async fn determine_relevant_authorities<Client>(
|
||||
client: Arc<Client>,
|
||||
async fn determine_relevant_authorities(
|
||||
ctx: &mut impl SubsystemContext,
|
||||
relay_parent: Hash,
|
||||
) -> Result<Vec<AuthorityDiscoveryId>, util::Error>
|
||||
where
|
||||
Client: ProvideRuntimeApi<Block>,
|
||||
Client::Api: AuthorityDiscoveryApi<Block>,
|
||||
{
|
||||
let api = client.runtime_api();
|
||||
let result = api.authorities(&BlockId::Hash(relay_parent))
|
||||
.map_err(|e| util::Error::RuntimeApi(format!("{:?}", e).into()));
|
||||
result
|
||||
) -> Result<Vec<AuthorityDiscoveryId>, util::Error> {
|
||||
let authorities = util::request_authorities_ctx(relay_parent, ctx).await?.await??;
|
||||
Ok(authorities)
|
||||
}
|
||||
|
||||
/// Return an error if we're not a validator in the given set (do not have keys).
|
||||
@@ -143,17 +128,12 @@ impl State {
|
||||
/// 1. Determine if the current session index has changed.
|
||||
/// 2. If it has, determine relevant validators
|
||||
/// and issue a connection request.
|
||||
async fn handle_active_leaves<Client>(
|
||||
async fn handle_active_leaves(
|
||||
&mut self,
|
||||
ctx: &mut impl SubsystemContext,
|
||||
client: Arc<Client>,
|
||||
keystore: &SyncCryptoStorePtr,
|
||||
leaves: impl Iterator<Item = Hash>,
|
||||
) -> Result<(), util::Error>
|
||||
where
|
||||
Client: ProvideRuntimeApi<Block>,
|
||||
Client::Api: AuthorityDiscoveryApi<Block>,
|
||||
{
|
||||
) -> Result<(), util::Error> {
|
||||
for leaf in leaves {
|
||||
let current_index = util::request_session_index_for_child_ctx(leaf, ctx).await?.await??;
|
||||
let maybe_new_session = match self.last_session_index {
|
||||
@@ -163,7 +143,7 @@ impl State {
|
||||
|
||||
if let Some((new_session, relay_parent)) = maybe_new_session {
|
||||
tracing::debug!(target: LOG_TARGET, %new_session, "New session detected");
|
||||
let authorities = determine_relevant_authorities(client.clone(), relay_parent).await?;
|
||||
let authorities = determine_relevant_authorities(ctx, relay_parent).await?;
|
||||
ensure_i_am_an_authority(keystore, &authorities).await?;
|
||||
tracing::debug!(target: LOG_TARGET, num = ?authorities.len(), "Issuing a connection request");
|
||||
|
||||
@@ -182,11 +162,9 @@ impl State {
|
||||
}
|
||||
}
|
||||
|
||||
impl<Client, Context> Subsystem<Context> for GossipSupport<Client>
|
||||
impl<Context> Subsystem<Context> for GossipSupport
|
||||
where
|
||||
Context: SubsystemContext<Message = GossipSupportMessage> + Sync + Send,
|
||||
Client: ProvideRuntimeApi<Block> + Send + 'static + Sync,
|
||||
Client::Api: AuthorityDiscoveryApi<Block>,
|
||||
{
|
||||
fn start(self, ctx: Context) -> SpawnedSubsystem {
|
||||
let future = self.run(ctx)
|
||||
|
||||
@@ -570,7 +570,6 @@ where
|
||||
),
|
||||
gossip_support: GossipSupportSubsystem::new(
|
||||
keystore.clone(),
|
||||
runtime_client.clone(),
|
||||
),
|
||||
};
|
||||
|
||||
|
||||
@@ -39,6 +39,7 @@ use polkadot_primitives::v1::{
|
||||
CandidateEvent, CommittedCandidateReceipt, CoreState, EncodeAs, PersistedValidationData,
|
||||
GroupRotationInfo, Hash, Id as ParaId, OccupiedCoreAssumption,
|
||||
SessionIndex, Signed, SigningContext, ValidationCode, ValidatorId, ValidatorIndex, SessionInfo,
|
||||
AuthorityDiscoveryId,
|
||||
};
|
||||
use sp_core::{traits::SpawnNamed, Public};
|
||||
use sp_application_crypto::AppKey;
|
||||
@@ -166,6 +167,7 @@ macro_rules! specialize_requests {
|
||||
}
|
||||
|
||||
specialize_requests! {
|
||||
fn request_authorities() -> Vec<AuthorityDiscoveryId>; Authorities;
|
||||
fn request_validators() -> Vec<ValidatorId>; Validators;
|
||||
fn request_validator_groups() -> (Vec<Vec<ValidatorIndex>>, GroupRotationInfo); ValidatorGroups;
|
||||
fn request_availability_cores() -> Vec<CoreState>; AvailabilityCores;
|
||||
@@ -247,6 +249,7 @@ macro_rules! specialize_requests_ctx {
|
||||
}
|
||||
|
||||
specialize_requests_ctx! {
|
||||
fn request_authorities_ctx() -> Vec<AuthorityDiscoveryId>; Authorities;
|
||||
fn request_validators_ctx() -> Vec<ValidatorId>; Validators;
|
||||
fn request_validator_groups_ctx() -> (Vec<Vec<ValidatorIndex>>, GroupRotationInfo); ValidatorGroups;
|
||||
fn request_availability_cores_ctx() -> Vec<CoreState>; AvailabilityCores;
|
||||
|
||||
@@ -437,6 +437,8 @@ pub type RuntimeApiSender<T> = oneshot::Sender<Result<T, crate::errors::RuntimeA
|
||||
/// A request to the Runtime API subsystem.
|
||||
#[derive(Debug)]
|
||||
pub enum RuntimeApiRequest {
|
||||
/// Get the next, current and some previous authority discovery set deduplicated.
|
||||
Authorities(RuntimeApiSender<Vec<AuthorityDiscoveryId>>),
|
||||
/// Get the current validator set.
|
||||
Validators(RuntimeApiSender<Vec<ValidatorId>>),
|
||||
/// Get the validator groups and group rotation info.
|
||||
|
||||
Reference in New Issue
Block a user