mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-28 17:57:56 +00:00
Add candidate info to OccupiedCore (#2134)
* guide: add candidate information to OccupiedCore * add descriptor and hash to occupied core type * guide: add candidate hash to inclusion * runtime: return candidate info in core state * bitfield signing: stop querying runtime as much * minimize going to runtime in availability distribution * fix availability distribution tests * guide: remove para ID from Occupied core * get all crates compiling
This commit is contained in:
committed by
GitHub
parent
eab86d6f4b
commit
38276b08a4
@@ -36,7 +36,7 @@ use polkadot_node_network_protocol::{
|
||||
};
|
||||
use polkadot_node_subsystem_util::metrics::{self, prometheus};
|
||||
use polkadot_primitives::v1::{
|
||||
BlakeTwo256, CommittedCandidateReceipt, CoreState, ErasureChunk, Hash, HashT, Id as ParaId,
|
||||
BlakeTwo256, CoreState, ErasureChunk, Hash, HashT,
|
||||
SessionIndex, ValidatorId, ValidatorIndex, PARACHAIN_KEY_TYPE_ID, CandidateHash,
|
||||
CandidateDescriptor,
|
||||
};
|
||||
@@ -62,11 +62,6 @@ const LOG_TARGET: &'static str = "availability_distribution";
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
enum Error {
|
||||
#[error("Response channel to obtain PendingAvailability failed")]
|
||||
QueryPendingAvailabilityResponseChannel(#[source] oneshot::Canceled),
|
||||
#[error("RuntimeAPI to obtain PendingAvailability failed")]
|
||||
QueryPendingAvailability(#[source] RuntimeApiError),
|
||||
|
||||
#[error("Response channel to obtain StoreChunk failed")]
|
||||
StoreChunkResponseChannel(#[source] oneshot::Canceled),
|
||||
|
||||
@@ -795,19 +790,12 @@ where
|
||||
e => e.or_default(),
|
||||
};
|
||||
|
||||
for para in query_para_ids(ctx, relay_parent).await? {
|
||||
if let Some(ccr) = query_pending_availability(ctx, relay_parent, para).await? {
|
||||
let receipt_hash = ccr.hash();
|
||||
let descriptor = ccr.descriptor().clone();
|
||||
|
||||
// unfortunately we have no good way of telling the candidate was
|
||||
// cached until now. But we don't clobber a `Cached` entry if there
|
||||
// is one already.
|
||||
live_candidates.entry(receipt_hash)
|
||||
.or_insert(FetchedLiveCandidate::Fresh(descriptor));
|
||||
|
||||
receipts_for.insert(receipt_hash);
|
||||
}
|
||||
for (receipt_hash, descriptor) in query_pending_availability(ctx, relay_parent).await? {
|
||||
// unfortunately we have no good way of telling the candidate was
|
||||
// cached until now. But we don't clobber a `Cached` entry if there
|
||||
// is one already.
|
||||
live_candidates.entry(receipt_hash).or_insert(FetchedLiveCandidate::Fresh(descriptor));
|
||||
receipts_for.insert(receipt_hash);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -850,9 +838,10 @@ where
|
||||
Ok((live_candidates, ancestors))
|
||||
}
|
||||
|
||||
/// Query all para IDs that are occupied under a given relay-parent.
|
||||
/// Query all hashes and descriptors of candidates pending availability at a particular block.
|
||||
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
|
||||
async fn query_para_ids<Context>(ctx: &mut Context, relay_parent: Hash) -> Result<Vec<ParaId>>
|
||||
async fn query_pending_availability<Context>(ctx: &mut Context, relay_parent: Hash)
|
||||
-> Result<Vec<(CandidateHash, CandidateDescriptor)>>
|
||||
where
|
||||
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
|
||||
{
|
||||
@@ -863,22 +852,18 @@ where
|
||||
)))
|
||||
.await;
|
||||
|
||||
let all_para_ids = rx
|
||||
let cores: Vec<_> = rx
|
||||
.await
|
||||
.map_err(|e| Error::AvailabilityCoresResponseChannel(e))?
|
||||
.map_err(|e| Error::AvailabilityCores(e))?;
|
||||
|
||||
let occupied_para_ids = all_para_ids
|
||||
.into_iter()
|
||||
.filter_map(|core_state| {
|
||||
if let CoreState::Occupied(occupied) = core_state {
|
||||
Some(occupied.para_id)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
Ok(cores.into_iter()
|
||||
.filter_map(|core_state| if let CoreState::Occupied(occupied) = core_state {
|
||||
Some((occupied.candidate_hash, occupied.candidate_descriptor))
|
||||
} else {
|
||||
None
|
||||
})
|
||||
.collect();
|
||||
Ok(occupied_para_ids)
|
||||
.collect())
|
||||
}
|
||||
|
||||
/// Modify the reputation of a peer based on its behavior.
|
||||
@@ -954,27 +939,6 @@ where
|
||||
rx.await.map_err(|e| Error::StoreChunkResponseChannel(e))
|
||||
}
|
||||
|
||||
/// Request the head data for a particular para.
|
||||
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
|
||||
async fn query_pending_availability<Context>(
|
||||
ctx: &mut Context,
|
||||
relay_parent: Hash,
|
||||
para: ParaId,
|
||||
) -> Result<Option<CommittedCandidateReceipt>>
|
||||
where
|
||||
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
|
||||
{
|
||||
let (tx, rx) = oneshot::channel();
|
||||
ctx.send_message(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
relay_parent,
|
||||
RuntimeApiRequest::CandidatePendingAvailability(para, tx),
|
||||
))).await;
|
||||
|
||||
rx.await
|
||||
.map_err(|e| Error::QueryPendingAvailabilityResponseChannel(e))?
|
||||
.map_err(|e| Error::QueryPendingAvailability(e))
|
||||
}
|
||||
|
||||
/// Query the validator set.
|
||||
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
|
||||
async fn query_validators<Context>(
|
||||
|
||||
Reference in New Issue
Block a user