mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-11 08:21:05 +00:00
Refactor and fix usage of get_session_index() and get_session_info_by_index() (#4735)
* Rename/refactor around get_session_index Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> * choose proper head for fetching session Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> * revert rename Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> * fix comments Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> * renaming and more comments Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> * review feedback Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> * Run Fetch task in correct session Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> * Log warning when ancestors unavailable Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> * Fixes Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> * fix Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>
This commit is contained in:
@@ -106,14 +106,13 @@ impl Requester {
|
||||
{
|
||||
tracing::trace!(target: LOG_TARGET, ?update, "Update fetching heads");
|
||||
let ActiveLeavesUpdate { activated, deactivated } = update;
|
||||
// Order important! We need to handle activated, prior to deactivated, otherwise we might
|
||||
// cancel still needed jobs.
|
||||
if let Some(activated) = activated {
|
||||
// Stale leaves happen after a reversion - we don't want to re-run availability there.
|
||||
if let LeafStatus::Fresh = activated.status {
|
||||
self.start_requesting_chunks(ctx, runtime, activated).await?;
|
||||
}
|
||||
// Stale leaves happen after a reversion - we don't want to re-run availability there.
|
||||
if let Some(leaf) = activated.filter(|leaf| leaf.status == LeafStatus::Fresh) {
|
||||
// Order important! We need to handle activated, prior to deactivated, otherwise we might
|
||||
// cancel still needed jobs.
|
||||
self.start_requesting_chunks(ctx, runtime, leaf).await?;
|
||||
}
|
||||
|
||||
self.stop_requesting_chunks(deactivated.into_iter());
|
||||
Ok(())
|
||||
}
|
||||
@@ -212,15 +211,24 @@ impl Requester {
|
||||
.with_session_info(
|
||||
ctx,
|
||||
runtime,
|
||||
// We use leaf here, as relay_parent must be in the same session as the
|
||||
// leaf. (Cores are dropped at session boundaries.) At the same time,
|
||||
// only leaves are guaranteed to be fetchable by the state trie.
|
||||
// We use leaf here, the relay_parent must be in the same session as the
|
||||
// leaf. This is guaranteed by runtime which ensures that cores are cleared
|
||||
// at session boundaries. At the same time, only leaves are guaranteed to
|
||||
// be fetchable by the state trie.
|
||||
leaf,
|
||||
|info| FetchTaskConfig::new(leaf, &core, tx, metrics, info),
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
.map_err(|err| {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
error = ?err,
|
||||
"Failed to spawn a fetch task"
|
||||
);
|
||||
err
|
||||
});
|
||||
|
||||
if let Some(task_cfg) = task_cfg {
|
||||
if let Ok(Some(task_cfg)) = task_cfg {
|
||||
e.insert(FetchTask::start(task_cfg, ctx).await?);
|
||||
}
|
||||
// Not a validator, nothing to do.
|
||||
@@ -274,7 +282,7 @@ where
|
||||
|
||||
// `head` is the child of the first block in `ancestors`, request its session index.
|
||||
let head_session_index = match ancestors_iter.next() {
|
||||
Some(parent) => runtime.get_session_index(ctx.sender(), *parent).await?,
|
||||
Some(parent) => runtime.get_session_index_for_child(ctx.sender(), *parent).await?,
|
||||
None => {
|
||||
// No first element, i.e. empty.
|
||||
return Ok(ancestors)
|
||||
@@ -285,7 +293,7 @@ where
|
||||
// The first parent is skipped.
|
||||
for parent in ancestors_iter {
|
||||
// Parent is the i-th ancestor, request session index for its child -- (i-1)th element.
|
||||
let session_index = runtime.get_session_index(ctx.sender(), *parent).await?;
|
||||
let session_index = runtime.get_session_index_for_child(ctx.sender(), *parent).await?;
|
||||
if session_index == head_session_index {
|
||||
session_ancestry_len += 1;
|
||||
} else {
|
||||
|
||||
@@ -105,7 +105,7 @@ impl SessionCache {
|
||||
Context: SubsystemContext,
|
||||
F: FnOnce(&SessionInfo) -> R,
|
||||
{
|
||||
let session_index = runtime.get_session_index(ctx.sender(), parent).await?;
|
||||
let session_index = runtime.get_session_index_for_child(ctx.sender(), parent).await?;
|
||||
|
||||
if let Some(o_info) = self.session_info_cache.get(&session_index) {
|
||||
tracing::trace!(target: LOG_TARGET, session_index, "Got session from lru");
|
||||
@@ -177,13 +177,15 @@ impl SessionCache {
|
||||
&self,
|
||||
ctx: &mut Context,
|
||||
runtime: &mut RuntimeInfo,
|
||||
parent: Hash,
|
||||
relay_parent: Hash,
|
||||
session_index: SessionIndex,
|
||||
) -> Result<Option<SessionInfo>, Error>
|
||||
where
|
||||
Context: SubsystemContext,
|
||||
{
|
||||
let info = runtime.get_session_info_by_index(ctx.sender(), parent, session_index).await?;
|
||||
let info = runtime
|
||||
.get_session_info_by_index(ctx.sender(), relay_parent, session_index)
|
||||
.await?;
|
||||
|
||||
let discovery_keys = info.session_info.discovery_keys.clone();
|
||||
let mut validator_groups = info.session_info.validator_groups.clone();
|
||||
|
||||
@@ -25,6 +25,7 @@ use polkadot_primitives::{
|
||||
v1::{BlockNumber, CoreState, GroupIndex, Hash, Id, ScheduledCore, SessionIndex},
|
||||
v2::SessionInfo,
|
||||
};
|
||||
|
||||
use polkadot_subsystem::{
|
||||
messages::{
|
||||
AllMessages, AvailabilityDistributionMessage, AvailabilityStoreMessage, ChainApiMessage,
|
||||
|
||||
@@ -434,7 +434,7 @@ where
|
||||
Context: SubsystemContext<Message = CollatorProtocolMessage>,
|
||||
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
|
||||
{
|
||||
let session_index = runtime.get_session_index(ctx.sender(), relay_parent).await?;
|
||||
let session_index = runtime.get_session_index_for_child(ctx.sender(), relay_parent).await?;
|
||||
let info = &runtime
|
||||
.get_session_info_by_index(ctx.sender(), relay_parent, session_index)
|
||||
.await?
|
||||
|
||||
@@ -190,8 +190,26 @@ impl DisputeSender {
|
||||
dispute: (SessionIndex, CandidateHash),
|
||||
) -> Result<()> {
|
||||
let (session_index, candidate_hash) = dispute;
|
||||
// We need some relay chain head for context for receiving session info information:
|
||||
let ref_head = self.active_sessions.values().next().ok_or(NonFatal::NoActiveHeads)?;
|
||||
// A relay chain head is required as context for receiving session info information from runtime and
|
||||
// storage. We will iterate `active_sessions` to find a suitable head. We assume that there is at
|
||||
// least one active head which, by `session_index`, is at least as recent as the `dispute` passed in.
|
||||
// We need to avoid picking an older one from a session that might not yet exist in storage.
|
||||
// Related to <https://github.com/paritytech/polkadot/issues/4730> .
|
||||
let ref_head = self
|
||||
.active_sessions
|
||||
.iter()
|
||||
.find_map(|(active_session_index, head_hash)| {
|
||||
// There might be more than one session index that is at least as recent as the dispute
|
||||
// so we just pick the first one. Keep in mind we are talking about the session index for the
|
||||
// child of block identified by `head_hash` and not the session index for the block.
|
||||
if active_session_index >= &session_index {
|
||||
Some(head_hash)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.ok_or(NonFatal::NoActiveHeads)?;
|
||||
|
||||
let info = runtime
|
||||
.get_session_info_by_index(ctx.sender(), *ref_head, session_index)
|
||||
.await?;
|
||||
@@ -293,7 +311,7 @@ impl DisputeSender {
|
||||
ctx: &mut Context,
|
||||
runtime: &mut RuntimeInfo,
|
||||
) -> Result<bool> {
|
||||
let new_sessions = get_active_session_indeces(ctx, runtime, &self.active_heads).await?;
|
||||
let new_sessions = get_active_session_indices(ctx, runtime, &self.active_heads).await?;
|
||||
let new_sessions_raw: HashSet<_> = new_sessions.keys().collect();
|
||||
let old_sessions_raw: HashSet<_> = self.active_sessions.keys().collect();
|
||||
let updated = new_sessions_raw != old_sessions_raw;
|
||||
@@ -306,14 +324,15 @@ impl DisputeSender {
|
||||
/// Retrieve the currently active sessions.
|
||||
///
|
||||
/// List is all indices of all active sessions together with the head that was used for the query.
|
||||
async fn get_active_session_indeces<Context: SubsystemContext>(
|
||||
async fn get_active_session_indices<Context: SubsystemContext>(
|
||||
ctx: &mut Context,
|
||||
runtime: &mut RuntimeInfo,
|
||||
active_heads: &Vec<Hash>,
|
||||
) -> Result<HashMap<SessionIndex, Hash>> {
|
||||
let mut indeces = HashMap::new();
|
||||
// Iterate all heads we track as active and fetch the child' session indices.
|
||||
for head in active_heads {
|
||||
let session_index = runtime.get_session_index(ctx.sender(), *head).await?;
|
||||
let session_index = runtime.get_session_index_for_child(ctx.sender(), *head).await?;
|
||||
indeces.insert(session_index, *head);
|
||||
}
|
||||
Ok(indeces)
|
||||
|
||||
@@ -204,7 +204,8 @@ impl SendTask {
|
||||
active_sessions: &HashMap<SessionIndex, Hash>,
|
||||
) -> Result<HashSet<AuthorityDiscoveryId>> {
|
||||
let ref_head = self.request.0.candidate_receipt.descriptor.relay_parent;
|
||||
// Parachain validators:
|
||||
// Retrieve all authorities which participated in the parachain consensus of the session
|
||||
// in which the candidate was backed.
|
||||
let info = runtime
|
||||
.get_session_info_by_index(ctx.sender(), ref_head, self.request.0.session_index)
|
||||
.await?;
|
||||
@@ -219,7 +220,8 @@ impl SendTask {
|
||||
.map(|(_, v)| v.clone())
|
||||
.collect();
|
||||
|
||||
// Current authorities:
|
||||
// Retrieve all authorities for the current session as indicated by the active
|
||||
// heads we are tracking.
|
||||
for (session_index, head) in active_sessions.iter() {
|
||||
let info =
|
||||
runtime.get_session_info_by_index(ctx.sender(), *head, *session_index).await?;
|
||||
|
||||
@@ -624,9 +624,9 @@ struct ActiveHeadData {
|
||||
statements: IndexMap<StoredStatementComparator, SignedFullStatement>,
|
||||
/// Large statements we are waiting for with associated meta data.
|
||||
waiting_large_statements: HashMap<CandidateHash, LargeStatementStatus>,
|
||||
/// The validators at this head.
|
||||
/// The parachain validators at the head's child session index.
|
||||
validators: Vec<ValidatorId>,
|
||||
/// The session index this head is at.
|
||||
/// The current session index of this fork.
|
||||
session_index: sp_staking::SessionIndex,
|
||||
/// How many `Seconded` statements we've seen per validator.
|
||||
seconded_counts: HashMap<ValidatorIndex, usize>,
|
||||
@@ -1798,8 +1798,9 @@ impl StatementDistributionSubsystem {
|
||||
"New active leaf",
|
||||
);
|
||||
|
||||
// Retrieve the parachain validators at the child of the head we track.
|
||||
let session_index =
|
||||
runtime.get_session_index(ctx.sender(), relay_parent).await?;
|
||||
runtime.get_session_index_for_child(ctx.sender(), relay_parent).await?;
|
||||
let info = runtime
|
||||
.get_session_info_by_index(ctx.sender(), relay_parent, session_index)
|
||||
.await?;
|
||||
|
||||
@@ -40,7 +40,7 @@ pub use polkadot_node_jaeger as jaeger;
|
||||
const ACTIVE_LEAVES_SMALLVEC_CAPACITY: usize = 8;
|
||||
|
||||
/// The status of an activated leaf.
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub enum LeafStatus {
|
||||
/// A leaf is fresh when it's the first time the leaf has been encountered.
|
||||
/// Most leaves should be fresh.
|
||||
|
||||
@@ -102,7 +102,7 @@ impl RollingSessionWindow {
|
||||
window_size: SessionWindowSize,
|
||||
block_hash: Hash,
|
||||
) -> Result<Self, SessionsUnavailable> {
|
||||
let session_index = get_session_index_for_head(ctx, block_hash).await?;
|
||||
let session_index = get_session_index_for_child(ctx, block_hash).await?;
|
||||
|
||||
let window_start = session_index.saturating_sub(window_size.get() - 1);
|
||||
|
||||
@@ -160,7 +160,7 @@ impl RollingSessionWindow {
|
||||
ctx: &mut (impl SubsystemContext + overseer::SubsystemContext),
|
||||
block_hash: Hash,
|
||||
) -> Result<SessionWindowUpdate, SessionsUnavailable> {
|
||||
let session_index = get_session_index_for_head(ctx, block_hash).await?;
|
||||
let session_index = get_session_index_for_child(ctx, block_hash).await?;
|
||||
|
||||
let old_window_start = self.earliest_session;
|
||||
|
||||
@@ -212,7 +212,12 @@ impl RollingSessionWindow {
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_session_index_for_head(
|
||||
// Returns the session index expected at any child of the `parent` block.
|
||||
//
|
||||
// Note: We could use `RuntimeInfo::get_session_index_for_child` here but it's
|
||||
// cleaner to just call the runtime API directly without needing to create an instance
|
||||
// of `RuntimeInfo`.
|
||||
async fn get_session_index_for_child(
|
||||
ctx: &mut (impl SubsystemContext + overseer::SubsystemContext),
|
||||
block_hash: Hash,
|
||||
) -> Result<SessionIndex, SessionsUnavailable> {
|
||||
|
||||
@@ -117,8 +117,9 @@ impl RuntimeInfo {
|
||||
}
|
||||
}
|
||||
|
||||
/// Retrieve the current session index.
|
||||
pub async fn get_session_index<Sender>(
|
||||
/// Returns the session index expected at any child of the `parent` block.
|
||||
/// This does not return the session index for the `parent` block.
|
||||
pub async fn get_session_index_for_child<Sender>(
|
||||
&mut self,
|
||||
sender: &mut Sender,
|
||||
parent: Hash,
|
||||
@@ -141,14 +142,14 @@ impl RuntimeInfo {
|
||||
pub async fn get_session_info<'a, Sender>(
|
||||
&'a mut self,
|
||||
sender: &mut Sender,
|
||||
parent: Hash,
|
||||
relay_parent: Hash,
|
||||
) -> Result<&'a ExtendedSessionInfo>
|
||||
where
|
||||
Sender: SubsystemSender,
|
||||
{
|
||||
let session_index = self.get_session_index(sender, parent).await?;
|
||||
let session_index = self.get_session_index_for_child(sender, relay_parent).await?;
|
||||
|
||||
self.get_session_info_by_index(sender, parent, session_index).await
|
||||
self.get_session_info_by_index(sender, relay_parent, session_index).await
|
||||
}
|
||||
|
||||
/// Get `ExtendedSessionInfo` by session index.
|
||||
@@ -185,7 +186,7 @@ impl RuntimeInfo {
|
||||
pub async fn check_signature<Sender, Payload, RealPayload>(
|
||||
&mut self,
|
||||
sender: &mut Sender,
|
||||
parent: Hash,
|
||||
relay_parent: Hash,
|
||||
signed: UncheckedSigned<Payload, RealPayload>,
|
||||
) -> Result<
|
||||
std::result::Result<Signed<Payload, RealPayload>, UncheckedSigned<Payload, RealPayload>>,
|
||||
@@ -195,9 +196,9 @@ impl RuntimeInfo {
|
||||
Payload: EncodeAs<RealPayload> + Clone,
|
||||
RealPayload: Encode + Clone,
|
||||
{
|
||||
let session_index = self.get_session_index(sender, parent).await?;
|
||||
let info = self.get_session_info_by_index(sender, parent, session_index).await?;
|
||||
Ok(check_signature(session_index, &info.session_info, parent, signed))
|
||||
let session_index = self.get_session_index_for_child(sender, relay_parent).await?;
|
||||
let info = self.get_session_info_by_index(sender, relay_parent, session_index).await?;
|
||||
Ok(check_signature(session_index, &info.session_info, relay_parent, signed))
|
||||
}
|
||||
|
||||
/// Build `ValidatorInfo` for the current session.
|
||||
|
||||
Reference in New Issue
Block a user