// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see .
//! Requester takes care of requesting erasure chunks for candidates that are pending
//! availability.
use std::{
collections::{
hash_map::{Entry, HashMap},
hash_set::HashSet,
},
iter::IntoIterator,
pin::Pin,
};
use futures::{
channel::{mpsc, oneshot},
task::{Context, Poll},
Stream,
};
use polkadot_node_subsystem::{
jaeger,
messages::{ChainApiMessage, RuntimeApiMessage},
overseer, ActivatedLeaf, ActiveLeavesUpdate,
};
use polkadot_node_subsystem_util::runtime::{get_occupied_cores, RuntimeInfo};
use polkadot_primitives::{CandidateHash, Hash, OccupiedCore, SessionIndex};
use super::{FatalError, Metrics, Result, LOG_TARGET};
#[cfg(test)]
mod tests;
/// Cache for session information.
mod session_cache;
use session_cache::SessionCache;
/// A task fetching a particular chunk.
mod fetch_task;
use fetch_task::{FetchTask, FetchTaskConfig, FromFetchTask};
/// Requester takes care of requesting erasure chunks from backing groups and stores them in the
/// av store.
///
/// It implements a stream that needs to be advanced for it making progress.
pub struct Requester {
/// Candidates we need to fetch our chunk for.
///
/// We keep those around as long as a candidate is pending availability on some leaf, so we
/// won't fetch chunks multiple times.
///
/// We remove them on failure, so we get retries on the next block still pending availability.
fetches: HashMap,
/// Localized information about sessions we are currently interested in.
session_cache: SessionCache,
/// Sender to be cloned for `FetchTask`s.
tx: mpsc::Sender,
/// Receive messages from `FetchTask`.
rx: mpsc::Receiver,
/// Prometheus Metrics
metrics: Metrics,
}
#[overseer::contextbounds(AvailabilityDistribution, prefix = self::overseer)]
impl Requester {
/// How many ancestors of the leaf should we consider along with it.
pub(crate) const LEAF_ANCESTRY_LEN_WITHIN_SESSION: usize = 3;
/// Create a new `Requester`.
///
/// You must feed it with `ActiveLeavesUpdate` via `update_fetching_heads` and make it progress
/// by advancing the stream.
pub fn new(metrics: Metrics) -> Self {
let (tx, rx) = mpsc::channel(1);
Requester { fetches: HashMap::new(), session_cache: SessionCache::new(), tx, rx, metrics }
}
/// Update heads that need availability distribution.
///
/// For all active heads we will be fetching our chunks for availability distribution.
pub async fn update_fetching_heads(
&mut self,
ctx: &mut Context,
runtime: &mut RuntimeInfo,
update: ActiveLeavesUpdate,
spans: &HashMap,
) -> Result<()> {
gum::trace!(target: LOG_TARGET, ?update, "Update fetching heads");
let ActiveLeavesUpdate { activated, deactivated } = update;
if let Some(leaf) = activated {
let span = spans
.get(&leaf.hash)
.map(|span| span.child("update-fetching-heads"))
.unwrap_or_else(|| jaeger::Span::new(&leaf.hash, "update-fetching-heads"))
.with_string_tag("leaf", format!("{:?}", leaf.hash))
.with_stage(jaeger::Stage::AvailabilityDistribution);
// Order important! We need to handle activated, prior to deactivated, otherwise we
// might cancel still needed jobs.
self.start_requesting_chunks(ctx, runtime, leaf, &span).await?;
}
self.stop_requesting_chunks(deactivated.into_iter());
Ok(())
}
/// Start requesting chunks for newly imported head.
///
/// This will also request [`SESSION_ANCESTRY_LEN`] leaf ancestors from the same session
/// and start requesting chunks for them too.
async fn start_requesting_chunks(
&mut self,
ctx: &mut Context,
runtime: &mut RuntimeInfo,
new_head: ActivatedLeaf,
span: &jaeger::Span,
) -> Result<()> {
let mut span = span
.child("request-chunks-new-head")
.with_string_tag("leaf", format!("{:?}", new_head.hash))
.with_stage(jaeger::Stage::AvailabilityDistribution);
let sender = &mut ctx.sender().clone();
let ActivatedLeaf { hash: leaf, .. } = new_head;
let (leaf_session_index, ancestors_in_session) = get_block_ancestors_in_same_session(
sender,
runtime,
leaf,
Self::LEAF_ANCESTRY_LEN_WITHIN_SESSION,
)
.await?;
span.add_uint_tag("ancestors-in-session", ancestors_in_session.len() as u64);
// Also spawn or bump tasks for candidates in ancestry in the same session.
for hash in std::iter::once(leaf).chain(ancestors_in_session) {
let span = span
.child("request-chunks-ancestor")
.with_string_tag("leaf", format!("{:?}", hash.clone()))
.with_stage(jaeger::Stage::AvailabilityDistribution);
let cores = get_occupied_cores(sender, hash).await?;
gum::trace!(
target: LOG_TARGET,
occupied_cores = ?cores,
"Query occupied core"
);
// Important:
// We mark the whole ancestry as live in the **leaf** hash, so we don't need to track
// any tasks separately.
//
// The next time the subsystem receives leaf update, some of spawned task will be bumped
// to be live in fresh relay parent, while some might get dropped due to the current
// leaf being deactivated.
self.add_cores(ctx, runtime, leaf, leaf_session_index, cores, span).await?;
}
Ok(())
}
/// Stop requesting chunks for obsolete heads.
fn stop_requesting_chunks(&mut self, obsolete_leaves: impl Iterator) {
let obsolete_leaves: HashSet<_> = obsolete_leaves.collect();
self.fetches.retain(|_, task| {
task.remove_leaves(&obsolete_leaves);
task.is_live()
})
}
/// Add candidates corresponding for a particular relay parent.
///
/// Starting requests where necessary.
///
/// Note: The passed in `leaf` is not the same as `CandidateDescriptor::relay_parent` in the
/// given cores. The latter is the `relay_parent` this candidate considers its parent, while the
/// passed in leaf might be some later block where the candidate is still pending availability.
async fn add_cores(
&mut self,
context: &mut Context,
runtime: &mut RuntimeInfo,
leaf: Hash,
leaf_session_index: SessionIndex,
cores: impl IntoIterator,
span: jaeger::Span,
) -> Result<()> {
for core in cores {
let mut span = span
.child("check-fetch-candidate")
.with_trace_id(core.candidate_hash)
.with_string_tag("leaf", format!("{:?}", leaf))
.with_candidate(core.candidate_hash)
.with_stage(jaeger::Stage::AvailabilityDistribution);
match self.fetches.entry(core.candidate_hash) {
Entry::Occupied(mut e) =>
// Just book keeping - we are already requesting that chunk:
{
span.add_string_tag("already-requested-chunk", "true");
e.get_mut().add_leaf(leaf);
},
Entry::Vacant(e) => {
span.add_string_tag("already-requested-chunk", "false");
let tx = self.tx.clone();
let metrics = self.metrics.clone();
let task_cfg = self
.session_cache
.with_session_info(
context,
runtime,
// We use leaf here, the relay_parent must be in the same session as
// the leaf. This is guaranteed by runtime which ensures that cores are
// cleared at session boundaries. At the same time, only leaves are
// guaranteed to be fetchable by the state trie.
leaf,
leaf_session_index,
|info| FetchTaskConfig::new(leaf, &core, tx, metrics, info, span),
)
.await
.map_err(|err| {
gum::warn!(
target: LOG_TARGET,
error = ?err,
"Failed to spawn a fetch task"
);
err
});
if let Ok(Some(task_cfg)) = task_cfg {
e.insert(FetchTask::start(task_cfg, context).await?);
}
// Not a validator, nothing to do.
},
}
}
Ok(())
}
}
impl Stream for Requester {
type Item = overseer::AvailabilityDistributionOutgoingMessages;
fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll