Dispute coordinator: look for included candidates in non-finalized chain (#4508)

* Fetch ancestors of the activated leaf

* Replace param type

* Increase step size

* Request block numbers for ancestors

* Store activated leaves in lru cache

* Fix doc-comment

* Rework LRU usage

* Typos & formatting

* Handle errors better

* Introduce a size limit for the ancestry

* Return fatal error when fetching finalized block

* Update tests

* Add tests for ordering provider

* Better naming

* fix zombienet test, new version

* zombienet add debug

* debug zombienet

Co-authored-by: Javier Viola <javier@parity.io>
This commit is contained in:
Chris Sosnin
2021-12-17 14:07:28 +03:00
committed by GitHub
parent 1eefce2aa9
commit 8b1514862d
7 changed files with 435 additions and 125 deletions
+3 -2
View File
@@ -27,7 +27,8 @@ variables:
CI_IMAGE: "paritytech/ci-linux:production" CI_IMAGE: "paritytech/ci-linux:production"
DOCKER_OS: "debian:stretch" DOCKER_OS: "debian:stretch"
ARCH: "x86_64" ARCH: "x86_64"
ZOMBIENET_IMAGE: "docker.io/paritytech/zombienet" #ZOMBIENET_IMAGE: "docker.io/paritytech/zombienet"
ZOMBIENET_IMAGE: "docker.io/paritypr/zombienet:ec2fa96d"
VAULT_SERVER_URL: "https://vault.parity-mgmt-vault.parity.io" VAULT_SERVER_URL: "https://vault.parity-mgmt-vault.parity.io"
VAULT_AUTH_PATH: "gitlab-parity-io-jwt" VAULT_AUTH_PATH: "gitlab-parity-io-jwt"
VAULT_AUTH_ROLE: "cicd_gitlab_parity_${CI_PROJECT_NAME}" VAULT_AUTH_ROLE: "cicd_gitlab_parity_${CI_PROJECT_NAME}"
@@ -666,7 +667,7 @@ zombienet-tests-malus-dispute-valid:
- echo "${PARACHAINS_IMAGE_NAME} ${PARACHAINS_IMAGE_TAG}" - echo "${PARACHAINS_IMAGE_NAME} ${PARACHAINS_IMAGE_TAG}"
- echo "${MALUS_IMAGE_NAME} ${MALUS_IMAGE_TAG}" - echo "${MALUS_IMAGE_NAME} ${MALUS_IMAGE_TAG}"
- echo "${GH_DIR}" - echo "${GH_DIR}"
- export DEBUG=zombie,zombie::network-node - export DEBUG=zombie*
- export ZOMBIENET_INTEGRATION_TEST_IMAGE=${PARACHAINS_IMAGE_NAME}:${PARACHAINS_IMAGE_TAG} - export ZOMBIENET_INTEGRATION_TEST_IMAGE=${PARACHAINS_IMAGE_NAME}:${PARACHAINS_IMAGE_TAG}
- export MALUS_IMAGE=${MALUS_IMAGE_NAME}:${MALUS_IMAGE_TAG} - export MALUS_IMAGE=${MALUS_IMAGE_NAME}:${MALUS_IMAGE_TAG}
- export COL_IMAGE=${COLLATOR_IMAGE_NAME}:${COLLATOR_IMAGE_TAG} - export COL_IMAGE=${COLLATOR_IMAGE_NAME}:${COLLATOR_IMAGE_TAG}
+1
View File
@@ -6276,6 +6276,7 @@ dependencies = [
"futures 0.3.18", "futures 0.3.18",
"kvdb", "kvdb",
"kvdb-memorydb", "kvdb-memorydb",
"lru 0.7.0",
"parity-scale-codec", "parity-scale-codec",
"polkadot-node-primitives", "polkadot-node-primitives",
"polkadot-node-subsystem", "polkadot-node-subsystem",
@@ -10,6 +10,7 @@ tracing = "0.1.29"
parity-scale-codec = "2" parity-scale-codec = "2"
kvdb = "0.10.0" kvdb = "0.10.0"
thiserror = "1.0.30" thiserror = "1.0.30"
lru = "0.7.0"
polkadot-primitives = { path = "../../../primitives" } polkadot-primitives = { path = "../../../primitives" }
polkadot-node-primitives = { path = "../../primitives" } polkadot-node-primitives = { path = "../../primitives" }
@@ -86,11 +86,11 @@ pub enum Fatal {
#[error("Writing to database failed: {0}")] #[error("Writing to database failed: {0}")]
DbWriteFailed(std::io::Error), DbWriteFailed(std::io::Error),
#[error("Oneshow for receiving block number from chain API got cancelled")] #[error("Oneshot for receiving response from chain API got cancelled")]
CanceledBlockNumber, ChainApiSenderDropped,
#[error("Retrieving block number from chain API failed with error: {0}")] #[error("Retrieving response from chain API unexpectedly failed with error: {0}")]
ChainApiBlockNumber(ChainApiError), ChainApi(#[from] ChainApiError),
} }
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
@@ -20,9 +20,10 @@ use std::{
}; };
use futures::channel::oneshot; use futures::channel::oneshot;
use lru::LruCache;
use polkadot_node_subsystem::{ use polkadot_node_subsystem::{
messages::ChainApiMessage, ActivatedLeaf, ActiveLeavesUpdate, SubsystemSender, messages::ChainApiMessage, ActivatedLeaf, ActiveLeavesUpdate, ChainApiError, SubsystemSender,
}; };
use polkadot_node_subsystem_util::runtime::get_candidate_events; use polkadot_node_subsystem_util::runtime::get_candidate_events;
use polkadot_primitives::v1::{BlockNumber, CandidateEvent, CandidateHash, CandidateReceipt, Hash}; use polkadot_primitives::v1::{BlockNumber, CandidateEvent, CandidateHash, CandidateReceipt, Hash};
@@ -35,6 +36,8 @@ use super::{
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
const LRU_OBSERVED_BLOCKS_CAPACITY: usize = 20;
/// Provider of `CandidateComparator` for candidates. /// Provider of `CandidateComparator` for candidates.
pub struct OrderingProvider { pub struct OrderingProvider {
/// All candidates we have seen included, which not yet have been finalized. /// All candidates we have seen included, which not yet have been finalized.
@@ -43,6 +46,10 @@ pub struct OrderingProvider {
/// ///
/// We need this to clean up `included_candidates` on `ActiveLeavesUpdate`. /// We need this to clean up `included_candidates` on `ActiveLeavesUpdate`.
candidates_by_block_number: BTreeMap<BlockNumber, HashSet<CandidateHash>>, candidates_by_block_number: BTreeMap<BlockNumber, HashSet<CandidateHash>>,
/// Latest relay blocks observed by the provider. We assume that ancestors of
/// cached blocks are already processed, i.e. we have saved corresponding
/// included candidates.
last_observed_blocks: LruCache<Hash, ()>,
} }
/// `Comparator` for ordering of disputes for candidates. /// `Comparator` for ordering of disputes for candidates.
@@ -119,6 +126,11 @@ impl CandidateComparator {
} }
impl OrderingProvider { impl OrderingProvider {
/// Limits the number of ancestors received for a single request.
pub(crate) const ANCESTRY_CHUNK_SIZE: usize = 10;
/// Limits the overall number of ancestors walked through for a given head.
pub(crate) const ANCESTRY_SIZE_LIMIT: usize = 1000;
/// Create a properly initialized `OrderingProvider`. /// Create a properly initialized `OrderingProvider`.
pub async fn new<Sender: SubsystemSender>( pub async fn new<Sender: SubsystemSender>(
sender: &mut Sender, sender: &mut Sender,
@@ -127,6 +139,7 @@ impl OrderingProvider {
let mut s = Self { let mut s = Self {
included_candidates: HashSet::new(), included_candidates: HashSet::new(),
candidates_by_block_number: BTreeMap::new(), candidates_by_block_number: BTreeMap::new(),
last_observed_blocks: LruCache::new(LRU_OBSERVED_BLOCKS_CAPACITY),
}; };
let update = let update =
ActiveLeavesUpdate { activated: Some(initial_head), deactivated: Default::default() }; ActiveLeavesUpdate { activated: Some(initial_head), deactivated: Default::default() };
@@ -171,22 +184,44 @@ impl OrderingProvider {
update: &ActiveLeavesUpdate, update: &ActiveLeavesUpdate,
) -> Result<()> { ) -> Result<()> {
if let Some(activated) = update.activated.as_ref() { if let Some(activated) = update.activated.as_ref() {
// Get included events: // Fetch ancestors of the activated leaf.
let included = get_candidate_events(sender, activated.hash) let ancestors = self
.await? .get_block_ancestors(sender, activated.hash, activated.number)
.into_iter() .await
.filter_map(|ev| match ev { .unwrap_or_else(|err| {
CandidateEvent::CandidateIncluded(receipt, _, _, _) => Some(receipt), tracing::debug!(
_ => None, target: LOG_TARGET,
activated_leaf = ?activated,
"Skipping leaf ancestors due to an error: {}",
err
);
Vec::new()
}); });
for receipt in included { // Ancestors block numbers are consecutive in the descending order.
let candidate_hash = receipt.hash(); let earliest_block_number = activated.number - ancestors.len() as u32;
self.included_candidates.insert(candidate_hash); let block_numbers = (earliest_block_number..=activated.number).rev();
self.candidates_by_block_number
.entry(activated.number) let block_hashes = std::iter::once(activated.hash).chain(ancestors);
.or_default() for (block_num, block_hash) in block_numbers.zip(block_hashes) {
.insert(candidate_hash); // Get included events:
let included = get_candidate_events(sender, block_hash)
.await?
.into_iter()
.filter_map(|ev| match ev {
CandidateEvent::CandidateIncluded(receipt, _, _, _) => Some(receipt),
_ => None,
});
for receipt in included {
let candidate_hash = receipt.hash();
self.included_candidates.insert(candidate_hash);
self.candidates_by_block_number
.entry(block_num)
.or_default()
.insert(candidate_hash);
}
} }
self.last_observed_blocks.put(activated.hash, ());
} }
Ok(()) Ok(())
@@ -205,6 +240,87 @@ impl OrderingProvider {
self.included_candidates.remove(&finalized_candidate); self.included_candidates.remove(&finalized_candidate);
} }
} }
/// Returns ancestors of `head` in the descending order, stopping
/// either at the block present in cache or the latest finalized block.
///
/// Suited specifically for querying non-finalized chains, thus
/// doesn't rely on block numbers.
///
/// Both `head` and last are **not** included in the result.
async fn get_block_ancestors<Sender: SubsystemSender>(
&mut self,
sender: &mut Sender,
mut head: Hash,
mut head_number: BlockNumber,
) -> Result<Vec<Hash>> {
let mut ancestors = Vec::new();
if self.last_observed_blocks.get(&head).is_some() {
return Ok(ancestors)
}
let finalized_block_number = get_finalized_block_number(sender).await?;
loop {
let (tx, rx) = oneshot::channel();
let hashes = {
sender
.send_message(
ChainApiMessage::Ancestors {
hash: head,
k: Self::ANCESTRY_CHUNK_SIZE,
response_channel: tx,
}
.into(),
)
.await;
rx.await.or(Err(Fatal::ChainApiSenderDropped))?.map_err(Fatal::ChainApi)?
};
let earliest_block_number = head_number - hashes.len() as u32;
// The reversed order is parent, grandparent, etc. excluding the head.
let block_numbers = (earliest_block_number..head_number).rev();
for (block_number, hash) in block_numbers.zip(&hashes) {
// Return if we either met finalized/cached block or
// hit the size limit for the returned ancestry of head.
if self.last_observed_blocks.get(hash).is_some() ||
block_number <= finalized_block_number ||
ancestors.len() >= Self::ANCESTRY_SIZE_LIMIT
{
return Ok(ancestors)
}
ancestors.push(*hash);
}
match hashes.last() {
Some(last_hash) => {
head = *last_hash;
head_number = earliest_block_number;
},
None => break,
}
}
return Ok(ancestors)
}
}
async fn send_message_fatal<Sender, Response>(
sender: &mut Sender,
message: ChainApiMessage,
receiver: oneshot::Receiver<std::result::Result<Response, ChainApiError>>,
) -> FatalResult<Response>
where
Sender: SubsystemSender,
{
sender.send_message(message.into()).await;
receiver
.await
.map_err(|_| Fatal::ChainApiSenderDropped)?
.map_err(Fatal::ChainApi)
} }
async fn get_block_number( async fn get_block_number(
@@ -212,9 +328,10 @@ async fn get_block_number(
relay_parent: Hash, relay_parent: Hash,
) -> FatalResult<Option<BlockNumber>> { ) -> FatalResult<Option<BlockNumber>> {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
sender.send_message(ChainApiMessage::BlockNumber(relay_parent, tx).into()).await; send_message_fatal(sender, ChainApiMessage::BlockNumber(relay_parent, tx), rx).await
}
rx.await
.map_err(|_| Fatal::CanceledBlockNumber)? async fn get_finalized_block_number(sender: &mut impl SubsystemSender) -> FatalResult<BlockNumber> {
.map_err(Fatal::ChainApiBlockNumber) let (number_tx, number_rx) = oneshot::channel();
send_message_fatal(sender, ChainApiMessage::FinalizedBlockNumber(number_tx), number_rx).await
} }
@@ -14,11 +14,11 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>. // along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use std::sync::Arc; use std::{sync::Arc, time::Duration};
use assert_matches::assert_matches; use assert_matches::assert_matches;
use futures::FutureExt; use futures::future::join;
use parity_scale_codec::Encode; use parity_scale_codec::Encode;
use sp_core::testing::TaskExecutor; use sp_core::testing::TaskExecutor;
@@ -32,9 +32,9 @@ use polkadot_node_subsystem::{
ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, ActivatedLeaf, ActiveLeavesUpdate, LeafStatus,
}; };
use polkadot_node_subsystem_test_helpers::{ use polkadot_node_subsystem_test_helpers::{
make_subsystem_context, TestSubsystemContext, TestSubsystemContextHandle, make_subsystem_context, TestSubsystemContext, TestSubsystemContextHandle, TestSubsystemSender,
}; };
use polkadot_node_subsystem_util::reexports::SubsystemContext; use polkadot_node_subsystem_util::{reexports::SubsystemContext, TimeoutExt};
use polkadot_primitives::v1::{ use polkadot_primitives::v1::{
BlakeTwo256, BlockNumber, CandidateDescriptor, CandidateEvent, CandidateReceipt, CoreIndex, BlakeTwo256, BlockNumber, CandidateDescriptor, CandidateEvent, CandidateReceipt, CoreIndex,
GroupIndex, Hash, HashT, HeadData, GroupIndex, Hash, HashT, HeadData,
@@ -44,105 +44,70 @@ use super::OrderingProvider;
type VirtualOverseer = TestSubsystemContextHandle<DisputeCoordinatorMessage>; type VirtualOverseer = TestSubsystemContextHandle<DisputeCoordinatorMessage>;
const OVERSEER_RECEIVE_TIMEOUT: Duration = Duration::from_secs(2);
async fn overseer_recv(virtual_overseer: &mut VirtualOverseer) -> AllMessages {
virtual_overseer
.recv()
.timeout(OVERSEER_RECEIVE_TIMEOUT)
.await
.expect("overseer `recv` timed out")
}
struct TestState { struct TestState {
next_block_number: BlockNumber, chain: Vec<Hash>,
ordering: OrderingProvider, ordering: OrderingProvider,
ctx: TestSubsystemContext<DisputeCoordinatorMessage, TaskExecutor>, ctx: TestSubsystemContext<DisputeCoordinatorMessage, TaskExecutor>,
} }
impl TestState { impl TestState {
async fn new() -> Self { async fn new() -> (Self, VirtualOverseer) {
let (mut ctx, ctx_handle) = make_subsystem_context(TaskExecutor::new()); let (mut ctx, mut ctx_handle) = make_subsystem_context(TaskExecutor::new());
let leaf = get_activated_leaf(1); let leaf = get_activated_leaf(0);
launch_virtual_overseer(&mut ctx, ctx_handle); let chain = vec![get_block_number_hash(0)];
Self {
next_block_number: 2,
ordering: OrderingProvider::new(ctx.sender(), leaf).await.unwrap(),
ctx,
}
}
/// Get a new leaf. let finalized_block_number = 0;
fn next_leaf(&mut self) -> ActivatedLeaf { let expected_ancestry_len = 1;
let r = get_activated_leaf(self.next_block_number); let overseer_fut = overseer_process_active_leaves_update(
self.next_block_number += 1; &mut ctx_handle,
r &chain,
} finalized_block_number,
expected_ancestry_len,
);
async fn process_active_leaves_update(&mut self) { let ordering_provider =
let update = self.next_leaf(); join(OrderingProvider::new(ctx.sender(), leaf.clone()), overseer_fut)
self.ordering .await
.process_active_leaves_update( .0
self.ctx.sender(), .unwrap();
&ActiveLeavesUpdate::start_work(update),
) let test_state = Self { chain, ordering: ordering_provider, ctx };
.await
.unwrap(); (test_state, ctx_handle)
} }
} }
/// Simulate other subsystems: fn next_block_number(chain: &[Hash]) -> BlockNumber {
fn launch_virtual_overseer(ctx: &mut impl SubsystemContext, ctx_handle: VirtualOverseer) { chain.len() as u32
ctx.spawn(
"serve-active-leaves-update",
async move { virtual_overseer(ctx_handle).await }.boxed(),
)
.unwrap();
} }
async fn virtual_overseer(mut ctx_handle: VirtualOverseer) { /// Get a new leaf.
let create_ev = |relay_parent: Hash| { fn next_leaf(chain: &mut Vec<Hash>) -> ActivatedLeaf {
vec![CandidateEvent::CandidateIncluded( let next_block_number = next_block_number(chain);
make_candidate_receipt(relay_parent), let next_hash = get_block_number_hash(next_block_number);
HeadData::default(), chain.push(next_hash);
CoreIndex::from(0), get_activated_leaf(next_block_number)
GroupIndex::from(0),
)]
};
assert_matches!(
ctx_handle.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
_,
RuntimeApiRequest::CandidateEvents(
tx,
)
)) => {
tx.send(Ok(Vec::new())).unwrap();
}
);
assert_matches!(
ctx_handle.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::CandidateEvents(
tx,
)
)) => {
tx.send(Ok(create_ev(relay_parent))).unwrap();
}
);
assert_matches!(
ctx_handle.recv().await,
AllMessages::ChainApi(ChainApiMessage::BlockNumber(_relay_parent, tx)) => {
tx.send(Ok(Some(1))).unwrap();
}
);
} }
/// Get a dummy `ActivatedLeaf` for a given block number. async fn process_active_leaves_update(
fn get_activated_leaf(n: BlockNumber) -> ActivatedLeaf { sender: &mut TestSubsystemSender,
ActivatedLeaf { ordering: &mut OrderingProvider,
hash: get_block_number_hash(n), update: ActivatedLeaf,
number: n, ) {
status: LeafStatus::Fresh, ordering
span: Arc::new(jaeger::Span::Disabled), .process_active_leaves_update(sender, &ActiveLeavesUpdate::start_work(update))
} .await
} .unwrap();
/// Get a dummy relay parent hash for dummy block number.
fn get_block_number_hash(n: BlockNumber) -> Hash {
BlakeTwo256::hash(&n.encode())
} }
fn make_candidate_receipt(relay_parent: Hash) -> CandidateReceipt { fn make_candidate_receipt(relay_parent: Hash) -> CandidateReceipt {
@@ -162,22 +127,231 @@ fn make_candidate_receipt(relay_parent: Hash) -> CandidateReceipt {
candidate candidate
} }
/// Get a dummy `ActivatedLeaf` for a given block number.
fn get_activated_leaf(n: BlockNumber) -> ActivatedLeaf {
ActivatedLeaf {
hash: get_block_number_hash(n),
number: n,
status: LeafStatus::Fresh,
span: Arc::new(jaeger::Span::Disabled),
}
}
/// Get a dummy relay parent hash for dummy block number.
fn get_block_number_hash(n: BlockNumber) -> Hash {
BlakeTwo256::hash(&n.encode())
}
/// Get a dummy event that corresponds to candidate inclusion for the given block number.
fn get_candidate_included_events(block_number: BlockNumber) -> Vec<CandidateEvent> {
vec![CandidateEvent::CandidateIncluded(
make_candidate_receipt(get_block_number_hash(block_number)),
HeadData::default(),
CoreIndex::from(0),
GroupIndex::from(0),
)]
}
async fn assert_candidate_events_request(virtual_overseer: &mut VirtualOverseer, chain: &[Hash]) {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
hash,
RuntimeApiRequest::CandidateEvents(tx),
)) => {
let maybe_block_number = chain.iter().position(|h| *h == hash);
let response = maybe_block_number
.map(|num| get_candidate_included_events(num as u32))
.unwrap_or_default();
tx.send(Ok(response)).unwrap();
}
);
}
async fn assert_block_number_request(virtual_overseer: &mut VirtualOverseer, chain: &[Hash]) {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::ChainApi(ChainApiMessage::BlockNumber(relay_parent, tx)) => {
let maybe_block_number =
chain.iter().position(|hash| *hash == relay_parent).map(|number| number as u32);
tx.send(Ok(maybe_block_number)).unwrap();
}
);
}
async fn assert_finalized_block_number_request(
virtual_overseer: &mut VirtualOverseer,
response: BlockNumber,
) {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(tx)) => {
tx.send(Ok(response)).unwrap();
}
);
}
async fn assert_block_ancestors_request(virtual_overseer: &mut VirtualOverseer, chain: &[Hash]) {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::ChainApi(ChainApiMessage::Ancestors { hash, k, response_channel }) => {
let maybe_block_position = chain.iter().position(|h| *h == hash);
let ancestors = maybe_block_position
.map(|idx| chain[..idx].iter().rev().take(k).copied().collect())
.unwrap_or_default();
response_channel.send(Ok(ancestors)).unwrap();
}
);
}
async fn overseer_process_active_leaves_update(
virtual_overseer: &mut VirtualOverseer,
chain: &[Hash],
finalized_block: BlockNumber,
expected_ancestry_len: usize,
) {
// Before walking through ancestors provider requests latest finalized block number.
assert_finalized_block_number_request(virtual_overseer, finalized_block).await;
// Expect block ancestors requests with respect to the ancestry step.
for _ in (0..expected_ancestry_len).step_by(OrderingProvider::ANCESTRY_CHUNK_SIZE) {
assert_block_ancestors_request(virtual_overseer, chain).await;
}
// For each ancestry and the head return corresponding candidates inclusions.
for _ in 0..expected_ancestry_len {
assert_candidate_events_request(virtual_overseer, chain).await;
}
}
#[test] #[test]
fn ordering_provider_provides_ordering_when_initialized() { fn ordering_provider_provides_ordering_when_initialized() {
let candidate = make_candidate_receipt(get_block_number_hash(2)); let candidate = make_candidate_receipt(get_block_number_hash(1));
futures::executor::block_on(async { futures::executor::block_on(async {
let mut state = TestState::new().await; let (state, mut virtual_overseer) = TestState::new().await;
let r = state
.ordering let TestState { mut chain, mut ordering, mut ctx } = state;
.candidate_comparator(state.ctx.sender(), &candidate)
.await let r = ordering.candidate_comparator(ctx.sender(), &candidate).await.unwrap();
.unwrap();
assert_matches!(r, None); assert_matches!(r, None);
// After next active leaves update we should have a comparator: // After next active leaves update we should have a comparator:
state.process_active_leaves_update().await; let next_update = next_leaf(&mut chain);
let r = state.ordering.candidate_comparator(state.ctx.sender(), &candidate).await;
let finalized_block_number = 0;
let expected_ancestry_len = 1;
let overseer_fut = overseer_process_active_leaves_update(
&mut virtual_overseer,
&chain,
finalized_block_number,
expected_ancestry_len,
);
join(process_active_leaves_update(ctx.sender(), &mut ordering, next_update), overseer_fut)
.await;
let r = join(
ordering.candidate_comparator(ctx.sender(), &candidate),
assert_block_number_request(&mut virtual_overseer, &chain),
)
.await
.0;
assert_matches!(r, Ok(Some(r2)) => { assert_matches!(r, Ok(Some(r2)) => {
assert_eq!(r2.relay_parent_block_number, 1); assert_eq!(r2.relay_parent_block_number, 1);
}); });
}); });
} }
#[test]
fn ordering_provider_requests_candidates_of_leaf_ancestors() {
futures::executor::block_on(async {
// How many blocks should we skip before sending a leaf update.
const BLOCKS_TO_SKIP: usize = 30;
let (state, mut virtual_overseer) = TestState::new().await;
let TestState { mut chain, mut ordering, mut ctx } = state;
let next_update = (0..BLOCKS_TO_SKIP).map(|_| next_leaf(&mut chain)).last().unwrap();
let finalized_block_number = 0;
let overseer_fut = overseer_process_active_leaves_update(
&mut virtual_overseer,
&chain,
finalized_block_number,
BLOCKS_TO_SKIP,
);
join(process_active_leaves_update(ctx.sender(), &mut ordering, next_update), overseer_fut)
.await;
let next_block_number = next_block_number(&chain);
for block_number in 1..next_block_number {
let candidate = make_candidate_receipt(get_block_number_hash(block_number));
let r = join(
ordering.candidate_comparator(ctx.sender(), &candidate),
assert_block_number_request(&mut virtual_overseer, &chain),
)
.await
.0;
assert_matches!(r, Ok(Some(r2)) => {
assert_eq!(r2.relay_parent_block_number, block_number);
});
}
});
}
#[test]
fn ordering_provider_requests_candidates_of_non_cached_ancestors() {
futures::executor::block_on(async {
// How many blocks should we skip before sending a leaf update.
const BLOCKS_TO_SKIP: &[usize] = &[30, 15];
let (state, mut virtual_overseer) = TestState::new().await;
let TestState { mut chain, mut ordering, mut ctx } = state;
let next_update = (0..BLOCKS_TO_SKIP[0]).map(|_| next_leaf(&mut chain)).last().unwrap();
let finalized_block_number = 0;
let overseer_fut = overseer_process_active_leaves_update(
&mut virtual_overseer,
&chain,
finalized_block_number,
BLOCKS_TO_SKIP[0],
);
join(process_active_leaves_update(ctx.sender(), &mut ordering, next_update), overseer_fut)
.await;
// Send the second request and verify that we don't go past the cached block.
let next_update = (0..BLOCKS_TO_SKIP[1]).map(|_| next_leaf(&mut chain)).last().unwrap();
let overseer_fut = overseer_process_active_leaves_update(
&mut virtual_overseer,
&chain,
finalized_block_number,
BLOCKS_TO_SKIP[1],
);
join(process_active_leaves_update(ctx.sender(), &mut ordering, next_update), overseer_fut)
.await;
});
}
#[test]
fn ordering_provider_requests_candidates_of_non_finalized_ancestors() {
futures::executor::block_on(async {
// How many blocks should we skip before sending a leaf update.
const BLOCKS_TO_SKIP: usize = 30;
let (state, mut virtual_overseer) = TestState::new().await;
let TestState { mut chain, mut ordering, mut ctx } = state;
let next_update = (0..BLOCKS_TO_SKIP).map(|_| next_leaf(&mut chain)).last().unwrap();
let finalized_block_number = 17;
let overseer_fut = overseer_process_active_leaves_update(
&mut virtual_overseer,
&chain,
finalized_block_number,
BLOCKS_TO_SKIP - finalized_block_number as usize, // Expect the provider not to go past finalized block.
);
join(process_active_leaves_update(ctx.sender(), &mut ordering, next_update), overseer_fut)
.await;
});
}
@@ -34,9 +34,12 @@ use parity_scale_codec::Encode;
use polkadot_node_primitives::SignedDisputeStatement; use polkadot_node_primitives::SignedDisputeStatement;
use polkadot_node_subsystem::{ use polkadot_node_subsystem::{
messages::{DisputeCoordinatorMessage, DisputeDistributionMessage, ImportStatementsResult}, messages::{
ChainApiMessage, DisputeCoordinatorMessage, DisputeDistributionMessage,
ImportStatementsResult,
},
overseer::FromOverseer, overseer::FromOverseer,
OverseerSignal, ChainApiError, OverseerSignal,
}; };
use polkadot_node_subsystem_util::TimeoutExt; use polkadot_node_subsystem_util::TimeoutExt;
use sc_keystore::LocalKeystore; use sc_keystore::LocalKeystore;
@@ -236,6 +239,19 @@ impl TestState {
) )
} }
// Since the test harness sends active leaves update for each block
// consecutively, walking back for ancestors is not necessary. Sending
// an error to the subsystem will force-skip this procedure, the ordering
// provider will only request for candidates included in the leaf.
assert_matches!(
virtual_overseer.recv().await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(
tx
)) => {
tx.send(Err(ChainApiError::from(""))).unwrap();
}
);
assert_matches!( assert_matches!(
virtual_overseer.recv().await, virtual_overseer.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request( AllMessages::RuntimeApi(RuntimeApiMessage::Request(