availability-distribution: look for leaf ancestors within the same session (#4596)

* availability-distribution: look for leaf ancestors

* Re-use subsystem-util

* Rework ancestry tasks scheduling

* Requester tests

* Improve readability for ancestors lookup
This commit is contained in:
Chris Sosnin
2022-01-26 07:52:07 +03:00
committed by GitHub
parent 922eb606c3
commit 0e2eb6d26c
5 changed files with 465 additions and 19 deletions
@@ -24,7 +24,7 @@ use thiserror::Error;
use futures::channel::oneshot;
use polkadot_node_subsystem_util::runtime;
use polkadot_subsystem::SubsystemError;
use polkadot_subsystem::{ChainApiError, SubsystemError};
use crate::LOG_TARGET;
@@ -63,6 +63,12 @@ pub enum Fatal {
/// Errors coming from runtime::Runtime.
#[error("Error while accessing runtime information: {0}")]
Runtime(#[from] runtime::Fatal),
#[error("Oneshot for receiving response from Chain API got cancelled")]
ChainApiSenderDropped(#[source] oneshot::Canceled),
#[error("Retrieving response from Chain API unexpectedly failed with error: {0}")]
ChainApi(#[from] ChainApiError),
}
/// Non-fatal errors of this subsystem.
@@ -64,7 +64,7 @@ pub struct FetchTask {
/// In other words, for which relay chain parents this candidate is considered live.
/// This is updated on every `ActiveLeavesUpdate` and enables us to know when we can safely
/// stop keeping track of that candidate/chunk.
live_in: HashSet<Hash>,
pub(crate) live_in: HashSet<Hash>,
/// We keep the task around in until `live_in` becomes empty, to make
/// sure we won't re-fetch an already fetched candidate.
@@ -27,7 +27,7 @@ use std::{
};
use futures::{
channel::mpsc,
channel::{mpsc, oneshot},
task::{Context, Poll},
Stream,
};
@@ -35,10 +35,15 @@ use futures::{
use polkadot_node_subsystem_util::runtime::{get_occupied_cores, RuntimeInfo};
use polkadot_primitives::v1::{CandidateHash, Hash, OccupiedCore};
use polkadot_subsystem::{
messages::AllMessages, ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, SubsystemContext,
messages::{AllMessages, ChainApiMessage},
ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, SubsystemContext,
};
use super::{Metrics, LOG_TARGET};
use super::{Metrics, Result, LOG_TARGET};
use crate::error::Fatal;
#[cfg(test)]
mod tests;
/// Cache for session information.
mod session_cache;
@@ -75,6 +80,9 @@ pub struct Requester {
}
impl Requester {
/// How many ancestors of the leaf should we consider along with it.
pub(crate) const LEAF_ANCESTRY_LEN_WITHIN_SESSION: usize = 3;
/// Create a new `Requester`.
///
/// You must feed it with `ActiveLeavesUpdate` via `update_fetching_heads` and make it progress
@@ -83,6 +91,7 @@ impl Requester {
let (tx, rx) = mpsc::channel(1);
Requester { fetches: HashMap::new(), session_cache: SessionCache::new(), tx, rx, metrics }
}
/// Update heads that need availability distribution.
///
/// For all active heads we will be fetching our chunks for availability distribution.
@@ -91,43 +100,72 @@ impl Requester {
ctx: &mut Context,
runtime: &mut RuntimeInfo,
update: ActiveLeavesUpdate,
) -> super::Result<()>
) -> Result<()>
where
Context: SubsystemContext,
{
tracing::trace!(target: LOG_TARGET, ?update, "Update fetching heads");
let ActiveLeavesUpdate { activated, deactivated } = update;
// Stale leaves happen after a reversion - we don't want to re-run availability there.
let activated = activated.and_then(|h| match h.status {
LeafStatus::Stale => None,
LeafStatus::Fresh => Some(h),
});
// Order important! We need to handle activated, prior to deactivated, otherwise we might
// cancel still needed jobs.
self.start_requesting_chunks(ctx, runtime, activated.into_iter()).await?;
if let Some(activated) = activated {
// Stale leaves happen after a reversion - we don't want to re-run availability there.
if let LeafStatus::Fresh = activated.status {
self.start_requesting_chunks(ctx, runtime, activated).await?;
}
}
self.stop_requesting_chunks(deactivated.into_iter());
Ok(())
}
/// Start requesting chunks for newly imported heads.
/// Start requesting chunks for newly imported head.
///
/// This will also request [`SESSION_ANCESTRY_LEN`] leaf ancestors from the same session
/// and start requesting chunks for them too.
async fn start_requesting_chunks<Context>(
&mut self,
ctx: &mut Context,
runtime: &mut RuntimeInfo,
new_heads: impl Iterator<Item = ActivatedLeaf>,
) -> super::Result<()>
new_head: ActivatedLeaf,
) -> Result<()>
where
Context: SubsystemContext,
{
for ActivatedLeaf { hash: leaf, .. } in new_heads {
let cores = get_occupied_cores(ctx, leaf).await?;
let ActivatedLeaf { hash: leaf, .. } = new_head;
let ancestors_in_session = get_block_ancestors_in_same_session(
ctx,
runtime,
leaf,
Self::LEAF_ANCESTRY_LEN_WITHIN_SESSION,
)
.await
.unwrap_or_else(|err| {
tracing::debug!(
target: LOG_TARGET,
leaf = ?leaf,
"Failed to fetch leaf ancestors in the same session due to an error: {}",
err
);
Vec::new()
});
// Also spawn or bump tasks for candidates in ancestry in the same session.
for hash in std::iter::once(leaf).chain(ancestors_in_session) {
let cores = get_occupied_cores(ctx, hash).await?;
tracing::trace!(
target: LOG_TARGET,
occupied_cores = ?cores,
"Query occupied core"
);
// Important:
// We mark the whole ancestry as live in the **leaf** hash, so we don't need to track
// any tasks separately.
//
// The next time the subsystem receives leaf update, some of spawned task will be bumped
// to be live in fresh relay parent, while some might get dropped due to the current leaf
// being deactivated.
self.add_cores(ctx, runtime, leaf, cores).await?;
}
Ok(())
}
@@ -154,7 +192,7 @@ impl Requester {
runtime: &mut RuntimeInfo,
leaf: Hash,
cores: impl IntoIterator<Item = OccupiedCore>,
) -> super::Result<()>
) -> Result<()>
where
Context: SubsystemContext,
{
@@ -215,3 +253,69 @@ impl Stream for Requester {
}
}
}
/// Requests up to `limit` ancestor hashes of relay parent in the same session.
async fn get_block_ancestors_in_same_session<Context>(
ctx: &mut Context,
runtime: &mut RuntimeInfo,
head: Hash,
limit: usize,
) -> Result<Vec<Hash>>
where
Context: SubsystemContext,
{
// The order is parent, grandparent, ...
//
// `limit + 1` since a session index for the last element in ancestry
// is obtained through its parent. It always gets truncated because
// `session_ancestry_len` can only be incremented `ancestors.len() - 1` times.
let mut ancestors = get_block_ancestors(ctx, head, limit + 1).await?;
let mut ancestors_iter = ancestors.iter();
// `head` is the child of the first block in `ancestors`, request its session index.
let head_session_index = match ancestors_iter.next() {
Some(parent) => runtime.get_session_index(ctx.sender(), *parent).await?,
None => {
// No first element, i.e. empty.
return Ok(ancestors)
},
};
let mut session_ancestry_len = 0;
// The first parent is skipped.
for parent in ancestors_iter {
// Parent is the i-th ancestor, request session index for its child -- (i-1)th element.
let session_index = runtime.get_session_index(ctx.sender(), *parent).await?;
if session_index == head_session_index {
session_ancestry_len += 1;
} else {
break
}
}
// Drop the rest.
ancestors.truncate(session_ancestry_len);
Ok(ancestors)
}
/// Request up to `limit` ancestor hashes of relay parent from the Chain API.
async fn get_block_ancestors<Context>(
ctx: &mut Context,
relay_parent: Hash,
limit: usize,
) -> Result<Vec<Hash>>
where
Context: SubsystemContext,
{
let (tx, rx) = oneshot::channel();
ctx.send_message(ChainApiMessage::Ancestors {
hash: relay_parent,
k: limit,
response_channel: tx,
})
.await;
let ancestors = rx.await.map_err(Fatal::ChainApiSenderDropped)?.map_err(Fatal::ChainApi)?;
Ok(ancestors)
}
@@ -0,0 +1,336 @@
// Copyright 2022 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use std::{future::Future, sync::Arc};
use futures::FutureExt;
use polkadot_node_network_protocol::jaeger;
use polkadot_node_primitives::{BlockData, ErasureChunk, PoV, SpawnNamed};
use polkadot_node_subsystem_util::runtime::RuntimeInfo;
use polkadot_primitives::v1::{
BlockNumber, CoreState, GroupIndex, Hash, Id, ScheduledCore, SessionIndex, SessionInfo,
};
use polkadot_subsystem::{
messages::{
AllMessages, AvailabilityDistributionMessage, AvailabilityStoreMessage, ChainApiMessage,
NetworkBridgeMessage, RuntimeApiMessage, RuntimeApiRequest,
},
ActivatedLeaf, ActiveLeavesUpdate, LeafStatus,
};
use polkadot_subsystem_testhelpers::{
make_subsystem_context, mock::make_ferdie_keystore, TestSubsystemContext,
TestSubsystemContextHandle,
};
use sp_core::testing::TaskExecutor;
use crate::tests::mock::{get_valid_chunk_data, make_session_info, OccupiedCoreBuilder};
use super::Requester;
fn get_erasure_chunk() -> ErasureChunk {
let pov = PoV { block_data: BlockData(vec![45, 46, 47]) };
get_valid_chunk_data(pov).1
}
#[derive(Clone)]
struct TestState {
/// Simulated relay chain heads. For each block except genesis
/// there exists a single corresponding candidate, handled in [`spawn_virtual_overseer`].
pub relay_chain: Vec<Hash>,
pub session_info: SessionInfo,
// Defines a way to compute a session index for the block with
// a given number. Returns 1 for all blocks by default.
pub session_index_for_block: fn(BlockNumber) -> SessionIndex,
}
impl TestState {
fn new() -> Self {
let relay_chain: Vec<_> = (0u8..10).map(Hash::repeat_byte).collect();
let session_info = make_session_info();
let session_index_for_block = |_| 1;
Self { relay_chain, session_info, session_index_for_block }
}
}
fn spawn_virtual_overseer(
pool: TaskExecutor,
test_state: TestState,
mut ctx_handle: TestSubsystemContextHandle<AvailabilityDistributionMessage>,
) {
pool.spawn(
"virtual-overseer",
None,
async move {
loop {
let msg = ctx_handle.try_recv().await;
if msg.is_none() {
break
}
match msg.unwrap() {
AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(..)) => {},
AllMessages::AvailabilityStore(AvailabilityStoreMessage::QueryChunk(
..,
tx,
)) => {
let chunk = get_erasure_chunk();
tx.send(Some(chunk)).expect("Receiver is expected to be alive");
},
AllMessages::AvailabilityStore(AvailabilityStoreMessage::StoreChunk {
tx,
..
}) => {
// Silently accept it.
tx.send(Ok(())).expect("Receiver is expected to be alive");
},
AllMessages::RuntimeApi(RuntimeApiMessage::Request(hash, req)) => {
match req {
RuntimeApiRequest::SessionIndexForChild(tx) => {
let chain = &test_state.relay_chain;
let block_number = chain
.iter()
.position(|h| *h == hash)
.expect("Invalid session index request");
// Compute session index.
let session_index_for_block = test_state.session_index_for_block;
tx.send(Ok(session_index_for_block(block_number as u32 + 1)))
.expect("Receiver should still be alive");
},
RuntimeApiRequest::SessionInfo(_, tx) => {
tx.send(Ok(Some(test_state.session_info.clone())))
.expect("Receiver should be alive.");
},
RuntimeApiRequest::AvailabilityCores(tx) => {
let para_id = Id::from(1);
let maybe_block_position =
test_state.relay_chain.iter().position(|h| *h == hash);
let cores = match maybe_block_position {
Some(block_num) => {
let core = if block_num == 0 {
CoreState::Scheduled(ScheduledCore {
para_id,
collator: None,
})
} else {
CoreState::Occupied(
OccupiedCoreBuilder {
group_responsible: GroupIndex(1),
para_id,
relay_parent: hash,
}
.build()
.0,
)
};
vec![core]
},
None => Vec::new(),
};
tx.send(Ok(cores)).expect("Receiver should be alive.")
},
_ => {
panic!("Unexpected runtime request: {:?}", req);
},
}
},
AllMessages::ChainApi(ChainApiMessage::Ancestors {
hash,
k,
response_channel,
}) => {
let chain = &test_state.relay_chain;
let maybe_block_position = chain.iter().position(|h| *h == hash);
let ancestors = maybe_block_position
.map(|idx| chain[..idx].iter().rev().take(k).copied().collect())
.unwrap_or_default();
response_channel
.send(Ok(ancestors))
.expect("Receiver is expected to be alive");
},
msg => panic!("Unexpected overseer message: {:?}", msg),
}
}
}
.boxed(),
);
}
fn test_harness<T: Future<Output = ()>>(
test_state: TestState,
test_fx: impl FnOnce(TestSubsystemContext<AvailabilityDistributionMessage, TaskExecutor>) -> T,
) {
let pool = TaskExecutor::new();
let (ctx, ctx_handle) = make_subsystem_context(pool.clone());
spawn_virtual_overseer(pool, test_state, ctx_handle);
futures::executor::block_on(test_fx(ctx));
}
#[test]
fn check_ancestry_lookup_in_same_session() {
let test_state = TestState::new();
let mut requester = Requester::new(Default::default());
let keystore = make_ferdie_keystore();
let mut runtime = RuntimeInfo::new(Some(keystore));
test_harness(test_state.clone(), |mut ctx| async move {
let chain = &test_state.relay_chain;
let block_number = 1;
let update = ActiveLeavesUpdate {
activated: Some(ActivatedLeaf {
hash: chain[block_number],
number: block_number as u32,
status: LeafStatus::Fresh,
span: Arc::new(jaeger::Span::Disabled),
}),
deactivated: Vec::new().into(),
};
requester
.update_fetching_heads(&mut ctx, &mut runtime, update)
.await
.expect("Leaf processing failed");
let fetch_tasks = &requester.fetches;
assert_eq!(fetch_tasks.len(), 1);
let block_1_candidate =
*fetch_tasks.keys().next().expect("A task is checked to be present; qed");
let block_number = 2;
let update = ActiveLeavesUpdate {
activated: Some(ActivatedLeaf {
hash: chain[block_number],
number: block_number as u32,
status: LeafStatus::Fresh,
span: Arc::new(jaeger::Span::Disabled),
}),
deactivated: Vec::new().into(),
};
requester
.update_fetching_heads(&mut ctx, &mut runtime, update)
.await
.expect("Leaf processing failed");
let fetch_tasks = &requester.fetches;
assert_eq!(fetch_tasks.len(), 2);
let task = fetch_tasks.get(&block_1_candidate).expect("Leaf hasn't been deactivated yet");
// The task should be live in both blocks 1 and 2.
assert_eq!(task.live_in.len(), 2);
let block_2_candidate = *fetch_tasks
.keys()
.find(|hash| **hash != block_1_candidate)
.expect("Two tasks are present, the first one corresponds to block 1 candidate; qed");
// Deactivate both blocks but keep the second task as a
// part of ancestry.
let block_number = 2 + Requester::LEAF_ANCESTRY_LEN_WITHIN_SESSION;
let update = ActiveLeavesUpdate {
activated: Some(ActivatedLeaf {
hash: test_state.relay_chain[block_number],
number: block_number as u32,
status: LeafStatus::Fresh,
span: Arc::new(jaeger::Span::Disabled),
}),
deactivated: vec![chain[1], chain[2]].into(),
};
requester
.update_fetching_heads(&mut ctx, &mut runtime, update)
.await
.expect("Leaf processing failed");
let fetch_tasks = &requester.fetches;
// The leaf + K its ancestors.
assert_eq!(fetch_tasks.len(), Requester::LEAF_ANCESTRY_LEN_WITHIN_SESSION + 1);
let block_2_task = fetch_tasks
.get(&block_2_candidate)
.expect("Expected to be live as a part of ancestry");
assert_eq!(block_2_task.live_in.len(), 1);
});
}
#[test]
fn check_ancestry_lookup_in_different_sessions() {
let mut test_state = TestState::new();
let mut requester = Requester::new(Default::default());
let keystore = make_ferdie_keystore();
let mut runtime = RuntimeInfo::new(Some(keystore));
test_state.session_index_for_block = |block_number| match block_number {
0..=3 => 1,
_ => 2,
};
test_harness(test_state.clone(), |mut ctx| async move {
let chain = &test_state.relay_chain;
let block_number = 3;
let update = ActiveLeavesUpdate {
activated: Some(ActivatedLeaf {
hash: chain[block_number],
number: block_number as u32,
status: LeafStatus::Fresh,
span: Arc::new(jaeger::Span::Disabled),
}),
deactivated: Vec::new().into(),
};
requester
.update_fetching_heads(&mut ctx, &mut runtime, update)
.await
.expect("Leaf processing failed");
let fetch_tasks = &requester.fetches;
assert_eq!(fetch_tasks.len(), 3.min(Requester::LEAF_ANCESTRY_LEN_WITHIN_SESSION + 1));
let block_number = 4;
let update = ActiveLeavesUpdate {
activated: Some(ActivatedLeaf {
hash: chain[block_number],
number: block_number as u32,
status: LeafStatus::Fresh,
span: Arc::new(jaeger::Span::Disabled),
}),
deactivated: vec![chain[1], chain[2], chain[3]].into(),
};
requester
.update_fetching_heads(&mut ctx, &mut runtime, update)
.await
.expect("Leaf processing failed");
let fetch_tasks = &requester.fetches;
assert_eq!(fetch_tasks.len(), 1);
let block_number = 5;
let update = ActiveLeavesUpdate {
activated: Some(ActivatedLeaf {
hash: chain[block_number],
number: block_number as u32,
status: LeafStatus::Fresh,
span: Arc::new(jaeger::Span::Disabled),
}),
deactivated: vec![chain[4]].into(),
};
requester
.update_fetching_heads(&mut ctx, &mut runtime, update)
.await
.expect("Leaf processing failed");
let fetch_tasks = &requester.fetches;
assert_eq!(fetch_tasks.len(), 2.min(Requester::LEAF_ANCESTRY_LEN_WITHIN_SESSION + 1));
});
}
+1 -1
View File
@@ -40,7 +40,7 @@ pub use polkadot_node_jaeger as jaeger;
const ACTIVE_LEAVES_SMALLVEC_CAPACITY: usize = 8;
/// The status of an activated leaf.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub enum LeafStatus {
/// A leaf is fresh when it's the first time the leaf has been encountered.
/// Most leaves should be fresh.