From 462ca043e53ff1c50582aedbed94919ad3b6433f Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 16 Jun 2021 20:10:50 +0100 Subject: [PATCH] extract determine_new_blocks into a separate utility (#3261) * extract determine_new_blocks into a separate utility * rework docs --- .../node/core/approval-voting/src/import.rs | 534 +----------------- .../src/determine_new_blocks.rs | 520 +++++++++++++++++ polkadot/node/subsystem-util/src/lib.rs | 2 + 3 files changed, 532 insertions(+), 524 deletions(-) create mode 100644 polkadot/node/subsystem-util/src/determine_new_blocks.rs diff --git a/polkadot/node/core/approval-voting/src/import.rs b/polkadot/node/core/approval-voting/src/import.rs index 74a966171e..4661d62de5 100644 --- a/polkadot/node/core/approval-voting/src/import.rs +++ b/polkadot/node/core/approval-voting/src/import.rs @@ -34,6 +34,7 @@ use polkadot_node_subsystem::{ }, SubsystemContext, SubsystemError, SubsystemResult, }; +use polkadot_node_subsystem_util::determine_new_blocks; use polkadot_node_subsystem_util::rolling_session_window::{ RollingSessionWindow, SessionWindowUpdate, }; @@ -63,112 +64,6 @@ use crate::time::{slot_number_to_tick, Tick}; use super::{LOG_TARGET, State, DBReader}; -// Given a new chain-head hash, this determines the hashes of all new blocks we should track -// metadata for, given this head. The list will typically include the `head` hash provided unless -// that block is already known, in which case the list should be empty. This is guaranteed to be -// a subset of the ancestry of `head`, as well as `head`, starting from `head` and moving -// backwards. -// -// This returns the entire ancestry up to the last finalized block's height or the last item we -// have in the DB. This may be somewhat expensive when first recovering from major sync. -async fn determine_new_blocks( - ctx: &mut impl SubsystemContext, - db: &impl DBReader, - head: Hash, - header: &Header, - finalized_number: BlockNumber, -) -> SubsystemResult> { - const ANCESTRY_STEP: usize = 4; - - // Early exit if the block is in the DB or too early. - { - let already_known = db.load_block_entry(&head)? - .is_some(); - - let before_relevant = header.number <= finalized_number; - - if already_known || before_relevant { - return Ok(Vec::new()); - } - } - - let mut ancestry = vec![(head, header.clone())]; - - // Early exit if the parent hash is in the DB. - if db.load_block_entry(&header.parent_hash)? - .is_some() - { - return Ok(ancestry); - } - - 'outer: loop { - let &(ref last_hash, ref last_header) = ancestry.last() - .expect("ancestry has length 1 at initialization and is only added to; qed"); - - // If we iterated back to genesis, which can happen at the beginning of chains. - if last_header.number <= 1 { - break 'outer - } - - let (tx, rx) = oneshot::channel(); - ctx.send_message(ChainApiMessage::Ancestors { - hash: *last_hash, - k: ANCESTRY_STEP, - response_channel: tx, - }.into()).await; - - // Continue past these errors. - let batch_hashes = match rx.await { - Err(_) | Ok(Err(_)) => break 'outer, - Ok(Ok(ancestors)) => ancestors, - }; - - let batch_headers = { - let (batch_senders, batch_receivers) = (0..batch_hashes.len()) - .map(|_| oneshot::channel()) - .unzip::<_, _, Vec<_>, Vec<_>>(); - - for (hash, sender) in batch_hashes.iter().cloned().zip(batch_senders) { - ctx.send_message(ChainApiMessage::BlockHeader(hash, sender).into()).await; - } - - let mut requests = futures::stream::FuturesOrdered::new(); - batch_receivers.into_iter().map(|rx| async move { - match rx.await { - Err(_) | Ok(Err(_)) => None, - Ok(Ok(h)) => h, - } - }) - .for_each(|x| requests.push(x)); - - let batch_headers: Vec<_> = requests - .flat_map(|x: Option
| stream::iter(x)) - .collect() - .await; - - // Any failed header fetch of the batch will yield a `None` result that will - // be skipped. Any failure at this stage means we'll just ignore those blocks - // as the chain DB has failed us. - if batch_headers.len() != batch_hashes.len() { break 'outer } - batch_headers - }; - - for (hash, header) in batch_hashes.into_iter().zip(batch_headers) { - let is_known = db.load_block_entry(&hash)?.is_some(); - - let is_relevant = header.number > finalized_number; - - if is_known || !is_relevant { - break 'outer - } - - ancestry.push((hash, header)); - } - } - - Ok(ancestry) -} - struct ImportedBlockInfo { included_candidates: Vec<(CandidateHash, CandidateReceipt, CoreIndex, GroupIndex)>, session_index: SessionIndex, @@ -445,9 +340,15 @@ pub(crate) async fn handle_new_head( // If we've just started the node and haven't yet received any finality notifications, // we don't do any look-back. Approval voting is only for nodes were already online. - let finalized_number = finalized_number.unwrap_or(header.number.saturating_sub(1)); + let lower_bound_number = finalized_number.unwrap_or(header.number.saturating_sub(1)); - let new_blocks = determine_new_blocks(ctx, &state.db, head, &header, finalized_number) + let new_blocks = determine_new_blocks( + ctx.sender(), + |h| state.db.load_block_entry(h).map(|e| e.is_some()), + head, + &header, + lower_bound_number, + ) .map_err(|e| SubsystemError::with_origin("approval-voting", e)) .await?; @@ -741,86 +642,6 @@ mod tests { } } - #[derive(Clone)] - struct TestChain { - start_number: BlockNumber, - headers: Vec
, - numbers: HashMap, - } - - impl TestChain { - fn new(start: BlockNumber, len: usize) -> Self { - assert!(len > 0, "len must be at least 1"); - - let base = Header { - digest: Default::default(), - extrinsics_root: Default::default(), - number: start, - state_root: Default::default(), - parent_hash: Default::default(), - }; - - let base_hash = base.hash(); - - let mut chain = TestChain { - start_number: start, - headers: vec![base], - numbers: vec![(base_hash, start)].into_iter().collect(), - }; - - for _ in 1..len { - chain.grow() - } - - chain - } - - fn grow(&mut self) { - let next = { - let last = self.headers.last().unwrap(); - Header { - digest: Default::default(), - extrinsics_root: Default::default(), - number: last.number + 1, - state_root: Default::default(), - parent_hash: last.hash(), - } - }; - - self.numbers.insert(next.hash(), next.number); - self.headers.push(next); - } - - fn header_by_number(&self, number: BlockNumber) -> Option<&Header> { - if number < self.start_number { - None - } else { - self.headers.get((number - self.start_number) as usize) - } - } - - fn header_by_hash(&self, hash: &Hash) -> Option<&Header> { - self.numbers.get(hash).and_then(|n| self.header_by_number(*n)) - } - - fn hash_by_number(&self, number: BlockNumber) -> Option { - self.header_by_number(number).map(|h| h.hash()) - } - - fn ancestry(&self, hash: &Hash, k: BlockNumber) -> Vec { - let n = match self.numbers.get(hash) { - None => return Vec::new(), - Some(&n) => n, - }; - - (0..k) - .map(|i| i + 1) - .filter_map(|i| self.header_by_number(n - i)) - .map(|h| h.hash()) - .collect() - } - } - struct MockAssignmentCriteria; impl AssignmentCriteria for MockAssignmentCriteria { @@ -856,340 +677,6 @@ mod tests { (VRFOutput(o.to_output()), VRFProof(p)) } - #[test] - fn determine_new_blocks_back_to_finalized() { - let pool = TaskExecutor::new(); - let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone()); - - let db = TestDB::default(); - - let chain = TestChain::new(10, 9); - - let head = chain.header_by_number(18).unwrap().clone(); - let head_hash = head.hash(); - let finalized_number = 12; - - // Finalized block should be omitted. The head provided to `determine_new_blocks` - // should be included. - let expected_ancestry = (13..=18) - .map(|n| chain.header_by_number(n).map(|h| (h.hash(), h.clone())).unwrap()) - .rev() - .collect::>(); - - let test_fut = Box::pin(async move { - let ancestry = determine_new_blocks( - &mut ctx, - &db, - head_hash, - &head, - finalized_number, - ).await.unwrap(); - - assert_eq!( - ancestry, - expected_ancestry, - ); - }); - - let aux_fut = Box::pin(async move { - assert_matches!( - handle.recv().await, - AllMessages::ChainApi(ChainApiMessage::Ancestors { - hash: h, - k, - response_channel: tx, - }) => { - assert_eq!(h, head_hash); - assert_eq!(k, 4); - let _ = tx.send(Ok(chain.ancestry(&h, k as _))); - } - ); - - for _ in 0..4 { - assert_matches!( - handle.recv().await, - AllMessages::ChainApi(ChainApiMessage::BlockHeader(h, tx)) => { - let _ = tx.send(Ok(chain.header_by_hash(&h).map(|h| h.clone()))); - } - ); - } - - assert_matches!( - handle.recv().await, - AllMessages::ChainApi(ChainApiMessage::Ancestors { - hash: h, - k, - response_channel: tx, - }) => { - assert_eq!(h, chain.hash_by_number(14).unwrap()); - assert_eq!(k, 4); - let _ = tx.send(Ok(chain.ancestry(&h, k as _))); - } - ); - - for _ in 0..4 { - assert_matches!( - handle.recv().await, - AllMessages::ChainApi(ChainApiMessage::BlockHeader(h, tx)) => { - let _ = tx.send(Ok(chain.header_by_hash(&h).map(|h| h.clone()))); - } - ); - } - - }); - - futures::executor::block_on(futures::future::join(test_fut, aux_fut)); - } - - #[test] - fn determine_new_blocks_back_to_known() { - let pool = TaskExecutor::new(); - let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone()); - - let mut db = TestDB::default(); - - let chain = TestChain::new(10, 9); - - let head = chain.header_by_number(18).unwrap().clone(); - let head_hash = head.hash(); - let finalized_number = 12; - let known_number = 15; - let known_hash = chain.hash_by_number(known_number).unwrap(); - - db.block_entries.insert( - known_hash, - crate::approval_db::v1::BlockEntry { - block_hash: known_hash, - parent_hash: Default::default(), - block_number: known_number, - session: 1, - slot: Slot::from(100), - relay_vrf_story: Default::default(), - candidates: Vec::new(), - approved_bitfield: Default::default(), - children: Vec::new(), - }.into(), - ); - - // Known block should be omitted. The head provided to `determine_new_blocks` - // should be included. - let expected_ancestry = (16..=18) - .map(|n| chain.header_by_number(n).map(|h| (h.hash(), h.clone())).unwrap()) - .rev() - .collect::>(); - - let test_fut = Box::pin(async move { - let ancestry = determine_new_blocks( - &mut ctx, - &db, - head_hash, - &head, - finalized_number, - ).await.unwrap(); - - assert_eq!( - ancestry, - expected_ancestry, - ); - }); - - let aux_fut = Box::pin(async move { - assert_matches!( - handle.recv().await, - AllMessages::ChainApi(ChainApiMessage::Ancestors { - hash: h, - k, - response_channel: tx, - }) => { - assert_eq!(h, head_hash); - assert_eq!(k, 4); - let _ = tx.send(Ok(chain.ancestry(&h, k as _))); - } - ); - - for _ in 0u32..4 { - assert_matches!( - handle.recv().await, - AllMessages::ChainApi(ChainApiMessage::BlockHeader(h, tx)) => { - let _ = tx.send(Ok(chain.header_by_hash(&h).map(|h| h.clone()))); - } - ); - } - }); - - futures::executor::block_on(futures::future::join(test_fut, aux_fut)); - } - - #[test] - fn determine_new_blocks_already_known_is_empty() { - let pool = TaskExecutor::new(); - let (mut ctx, _handle) = make_subsystem_context::<(), _>(pool.clone()); - - let mut db = TestDB::default(); - - let chain = TestChain::new(10, 9); - - let head = chain.header_by_number(18).unwrap().clone(); - let head_hash = head.hash(); - let finalized_number = 0; - - db.block_entries.insert( - head_hash, - crate::approval_db::v1::BlockEntry { - block_hash: head_hash, - parent_hash: Default::default(), - block_number: 18, - session: 1, - slot: Slot::from(100), - relay_vrf_story: Default::default(), - candidates: Vec::new(), - approved_bitfield: Default::default(), - children: Vec::new(), - }.into(), - ); - - // Known block should be omitted. - let expected_ancestry = Vec::new(); - - let test_fut = Box::pin(async move { - let ancestry = determine_new_blocks( - &mut ctx, - &db, - head_hash, - &head, - finalized_number, - ).await.unwrap(); - - assert_eq!( - ancestry, - expected_ancestry, - ); - }); - - futures::executor::block_on(test_fut); - } - - #[test] - fn determine_new_blocks_parent_known_is_fast() { - let pool = TaskExecutor::new(); - let (mut ctx, _handle) = make_subsystem_context::<(), _>(pool.clone()); - - let mut db = TestDB::default(); - - let chain = TestChain::new(10, 9); - - let head = chain.header_by_number(18).unwrap().clone(); - let head_hash = head.hash(); - let finalized_number = 0; - let parent_hash = chain.hash_by_number(17).unwrap(); - - db.block_entries.insert( - parent_hash, - crate::approval_db::v1::BlockEntry { - block_hash: parent_hash, - parent_hash: Default::default(), - block_number: 18, - session: 1, - slot: Slot::from(100), - relay_vrf_story: Default::default(), - candidates: Vec::new(), - approved_bitfield: Default::default(), - children: Vec::new(), - }.into(), - ); - - // New block should be the only new one. - let expected_ancestry = vec![(head_hash, head.clone())]; - - let test_fut = Box::pin(async move { - let ancestry = determine_new_blocks( - &mut ctx, - &db, - head_hash, - &head, - finalized_number, - ).await.unwrap(); - - assert_eq!( - ancestry, - expected_ancestry, - ); - }); - - futures::executor::block_on(test_fut); - } - - #[test] - fn determine_new_block_before_finality_is_empty() { - let pool = TaskExecutor::new(); - let (mut ctx, _handle) = make_subsystem_context::<(), _>(pool.clone()); - - let chain = TestChain::new(10, 9); - - let head = chain.header_by_number(18).unwrap().clone(); - let head_hash = head.hash(); - let parent_hash = chain.hash_by_number(17).unwrap(); - let mut db = TestDB::default(); - - db.block_entries.insert( - parent_hash, - crate::approval_db::v1::BlockEntry { - block_hash: parent_hash, - parent_hash: Default::default(), - block_number: 18, - session: 1, - slot: Slot::from(100), - relay_vrf_story: Default::default(), - candidates: Vec::new(), - approved_bitfield: Default::default(), - children: Vec::new(), - }.into(), - ); - - let test_fut = Box::pin(async move { - let after_finality = determine_new_blocks( - &mut ctx, - &db, - head_hash, - &head, - 17, - ).await.unwrap(); - - let at_finality = determine_new_blocks( - &mut ctx, - &db, - head_hash, - &head, - 18, - ).await.unwrap(); - - let before_finality = determine_new_blocks( - &mut ctx, - &db, - head_hash, - &head, - 19, - ).await.unwrap(); - - assert_eq!( - after_finality, - vec![(head_hash, head.clone())], - ); - - assert_eq!( - at_finality, - Vec::new(), - ); - - assert_eq!( - before_finality, - Vec::new(), - ); - }); - - futures::executor::block_on(test_fut); - } - fn dummy_session_info(index: SessionIndex) -> SessionInfo { SessionInfo { validators: Vec::new(), @@ -1678,8 +1165,7 @@ mod tests { let slot = Slot::from(10); - let chain = TestChain::new(4, 1); - let parent_hash = chain.header_by_number(4).unwrap().hash(); + let parent_hash = Hash::repeat_byte(0x01); let header = Header { digest: { diff --git a/polkadot/node/subsystem-util/src/determine_new_blocks.rs b/polkadot/node/subsystem-util/src/determine_new_blocks.rs new file mode 100644 index 0000000000..205ca1d8f1 --- /dev/null +++ b/polkadot/node/subsystem-util/src/determine_new_blocks.rs @@ -0,0 +1,520 @@ +// Copyright 2021 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! A utility for fetching all unknown blocks based on a new chain-head hash. + +use polkadot_node_subsystem::{ + messages::ChainApiMessage, + SubsystemSender, SubsystemError, SubsystemResult, +}; +use polkadot_primitives::v1::{Hash, Header, BlockNumber}; +use futures::prelude::*; +use futures::channel::oneshot; + +/// Given a new chain-head hash, this determines the hashes of all new blocks we should track +/// metadata for, given this head. +/// +/// This is guaranteed to be a subset of the (inclusive) ancestry of `head` determined as all +/// blocks above the lower bound or above the highest known block, whichever is higher. +/// This is formatted in descending order by block height. +/// +/// An implication of this is that if `head` itself is known or not above the lower bound, +/// then the returned list will be empty. +/// +/// This may be somewhat expensive when first recovering from major sync. +pub async fn determine_new_blocks( + ctx: &mut impl SubsystemSender, + is_known: impl Fn(&Hash) -> Result, + head: Hash, + header: &Header, + lower_bound_number: BlockNumber, +) -> SubsystemResult> + where SubsystemError: From +{ + const ANCESTRY_STEP: usize = 4; + + // Early exit if the block is in the DB or too early. + { + let already_known = is_known(&head)?; + + let before_relevant = header.number <= lower_bound_number; + + if already_known || before_relevant { + return Ok(Vec::new()); + } + } + + let mut ancestry = vec![(head, header.clone())]; + + // Early exit if the parent hash is in the DB. + if is_known(&header.parent_hash)? { + return Ok(ancestry); + } + + 'outer: loop { + let &(ref last_hash, ref last_header) = ancestry.last() + .expect("ancestry has length 1 at initialization and is only added to; qed"); + + // If we iterated back to genesis, which can happen at the beginning of chains. + if last_header.number <= 1 { + break 'outer + } + + let (tx, rx) = oneshot::channel(); + ctx.send_message(ChainApiMessage::Ancestors { + hash: *last_hash, + k: ANCESTRY_STEP, + response_channel: tx, + }.into()).await; + + // Continue past these errors. + let batch_hashes = match rx.await { + Err(_) | Ok(Err(_)) => break 'outer, + Ok(Ok(ancestors)) => ancestors, + }; + + let batch_headers = { + let (batch_senders, batch_receivers) = (0..batch_hashes.len()) + .map(|_| oneshot::channel()) + .unzip::<_, _, Vec<_>, Vec<_>>(); + + for (hash, sender) in batch_hashes.iter().cloned().zip(batch_senders) { + ctx.send_message(ChainApiMessage::BlockHeader(hash, sender).into()).await; + } + + let mut requests = futures::stream::FuturesOrdered::new(); + batch_receivers.into_iter().map(|rx| async move { + match rx.await { + Err(_) | Ok(Err(_)) => None, + Ok(Ok(h)) => h, + } + }) + .for_each(|x| requests.push(x)); + + let batch_headers: Vec<_> = requests + .flat_map(|x: Option
| stream::iter(x)) + .collect() + .await; + + // Any failed header fetch of the batch will yield a `None` result that will + // be skipped. Any failure at this stage means we'll just ignore those blocks + // as the chain DB has failed us. + if batch_headers.len() != batch_hashes.len() { break 'outer } + batch_headers + }; + + for (hash, header) in batch_hashes.into_iter().zip(batch_headers) { + let is_known = is_known(&hash)?; + + let is_relevant = header.number > lower_bound_number; + + if is_known || !is_relevant { + break 'outer + } + + ancestry.push((hash, header)); + } + } + + Ok(ancestry) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::{HashSet, HashMap}; + use sp_core::testing::TaskExecutor; + use polkadot_node_subsystem::{messages::AllMessages, SubsystemContext}; + use polkadot_node_subsystem_test_helpers::make_subsystem_context; + use assert_matches::assert_matches; + + #[derive(Default)] + struct TestKnownBlocks { + blocks: HashSet, + } + + impl TestKnownBlocks { + fn insert(&mut self, hash: Hash) { + self.blocks.insert(hash); + } + + fn is_known(&self, hash: &Hash) -> Result { + Ok(self.blocks.contains(hash)) + } + } + + #[derive(Clone)] + struct TestChain { + start_number: BlockNumber, + headers: Vec
, + numbers: HashMap, + } + + impl TestChain { + fn new(start: BlockNumber, len: usize) -> Self { + assert!(len > 0, "len must be at least 1"); + + let base = Header { + digest: Default::default(), + extrinsics_root: Default::default(), + number: start, + state_root: Default::default(), + parent_hash: Default::default(), + }; + + let base_hash = base.hash(); + + let mut chain = TestChain { + start_number: start, + headers: vec![base], + numbers: vec![(base_hash, start)].into_iter().collect(), + }; + + for _ in 1..len { + chain.grow() + } + + chain + } + + fn grow(&mut self) { + let next = { + let last = self.headers.last().unwrap(); + Header { + digest: Default::default(), + extrinsics_root: Default::default(), + number: last.number + 1, + state_root: Default::default(), + parent_hash: last.hash(), + } + }; + + self.numbers.insert(next.hash(), next.number); + self.headers.push(next); + } + + fn header_by_number(&self, number: BlockNumber) -> Option<&Header> { + if number < self.start_number { + None + } else { + self.headers.get((number - self.start_number) as usize) + } + } + + fn header_by_hash(&self, hash: &Hash) -> Option<&Header> { + self.numbers.get(hash).and_then(|n| self.header_by_number(*n)) + } + + fn hash_by_number(&self, number: BlockNumber) -> Option { + self.header_by_number(number).map(|h| h.hash()) + } + + fn ancestry(&self, hash: &Hash, k: BlockNumber) -> Vec { + let n = match self.numbers.get(hash) { + None => return Vec::new(), + Some(&n) => n, + }; + + (0..k) + .map(|i| i + 1) + .filter_map(|i| self.header_by_number(n - i)) + .map(|h| h.hash()) + .collect() + } + } + + #[test] + fn determine_new_blocks_back_to_lower_bound() { + let pool = TaskExecutor::new(); + let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone()); + + let known = TestKnownBlocks::default(); + + let chain = TestChain::new(10, 9); + + let head = chain.header_by_number(18).unwrap().clone(); + let head_hash = head.hash(); + let lower_bound_number = 12; + + // Finalized block should be omitted. The head provided to `determine_new_blocks` + // should be included. + let expected_ancestry = (13..=18) + .map(|n| chain.header_by_number(n).map(|h| (h.hash(), h.clone())).unwrap()) + .rev() + .collect::>(); + + let test_fut = Box::pin(async move { + let ancestry = determine_new_blocks( + ctx.sender(), + |h| known.is_known(h), + head_hash, + &head, + lower_bound_number, + ).await.unwrap(); + + assert_eq!( + ancestry, + expected_ancestry, + ); + }); + + let aux_fut = Box::pin(async move { + assert_matches!( + handle.recv().await, + AllMessages::ChainApi(ChainApiMessage::Ancestors { + hash: h, + k, + response_channel: tx, + }) => { + assert_eq!(h, head_hash); + assert_eq!(k, 4); + let _ = tx.send(Ok(chain.ancestry(&h, k as _))); + } + ); + + for _ in 0u32..4 { + assert_matches!( + handle.recv().await, + AllMessages::ChainApi(ChainApiMessage::BlockHeader(h, tx)) => { + let _ = tx.send(Ok(chain.header_by_hash(&h).map(|h| h.clone()))); + } + ); + } + + assert_matches!( + handle.recv().await, + AllMessages::ChainApi(ChainApiMessage::Ancestors { + hash: h, + k, + response_channel: tx, + }) => { + assert_eq!(h, chain.hash_by_number(14).unwrap()); + assert_eq!(k, 4); + let _ = tx.send(Ok(chain.ancestry(&h, k as _))); + } + ); + + for _ in 0..4 { + assert_matches!( + handle.recv().await, + AllMessages::ChainApi(ChainApiMessage::BlockHeader(h, tx)) => { + let _ = tx.send(Ok(chain.header_by_hash(&h).map(|h| h.clone()))); + } + ); + } + + }); + + futures::executor::block_on(futures::future::join(test_fut, aux_fut)); + } + + #[test] + fn determine_new_blocks_back_to_known() { + let pool = TaskExecutor::new(); + let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone()); + + let mut known = TestKnownBlocks::default(); + + let chain = TestChain::new(10, 9); + + let head = chain.header_by_number(18).unwrap().clone(); + let head_hash = head.hash(); + let lower_bound_number = 12; + let known_number = 15; + let known_hash = chain.hash_by_number(known_number).unwrap(); + + known.insert(known_hash); + + // Known block should be omitted. The head provided to `determine_new_blocks` + // should be included. + let expected_ancestry = (16..=18) + .map(|n| chain.header_by_number(n).map(|h| (h.hash(), h.clone())).unwrap()) + .rev() + .collect::>(); + + let test_fut = Box::pin(async move { + let ancestry = determine_new_blocks( + ctx.sender(), + |h| known.is_known(h), + head_hash, + &head, + lower_bound_number, + ).await.unwrap(); + + assert_eq!( + ancestry, + expected_ancestry, + ); + }); + + let aux_fut = Box::pin(async move { + assert_matches!( + handle.recv().await, + AllMessages::ChainApi(ChainApiMessage::Ancestors { + hash: h, + k, + response_channel: tx, + }) => { + assert_eq!(h, head_hash); + assert_eq!(k, 4); + let _ = tx.send(Ok(chain.ancestry(&h, k as _))); + } + ); + + for _ in 0u32..4 { + assert_matches!( + handle.recv().await, + AllMessages::ChainApi(ChainApiMessage::BlockHeader(h, tx)) => { + let _ = tx.send(Ok(chain.header_by_hash(&h).map(|h| h.clone()))); + } + ); + } + }); + + futures::executor::block_on(futures::future::join(test_fut, aux_fut)); + } + + #[test] + fn determine_new_blocks_already_known_is_empty() { + let pool = TaskExecutor::new(); + let (mut ctx, _handle) = make_subsystem_context::<(), _>(pool.clone()); + + let mut known = TestKnownBlocks::default(); + + let chain = TestChain::new(10, 9); + + let head = chain.header_by_number(18).unwrap().clone(); + let head_hash = head.hash(); + let lower_bound_number = 0; + + known.insert(head_hash); + + // Known block should be omitted. + let expected_ancestry = Vec::new(); + + let test_fut = Box::pin(async move { + let ancestry = determine_new_blocks( + ctx.sender(), + |h| known.is_known(h), + head_hash, + &head, + lower_bound_number, + ).await.unwrap(); + + assert_eq!( + ancestry, + expected_ancestry, + ); + }); + + futures::executor::block_on(test_fut); + } + + #[test] + fn determine_new_blocks_parent_known_is_fast() { + let pool = TaskExecutor::new(); + let (mut ctx, _handle) = make_subsystem_context::<(), _>(pool.clone()); + + let mut known = TestKnownBlocks::default(); + + let chain = TestChain::new(10, 9); + + let head = chain.header_by_number(18).unwrap().clone(); + let head_hash = head.hash(); + let lower_bound_number = 0; + let parent_hash = chain.hash_by_number(17).unwrap(); + + known.insert(parent_hash); + + // New block should be the only new one. + let expected_ancestry = vec![(head_hash, head.clone())]; + + let test_fut = Box::pin(async move { + let ancestry = determine_new_blocks( + ctx.sender(), + |h| known.is_known(h), + head_hash, + &head, + lower_bound_number, + ).await.unwrap(); + + assert_eq!( + ancestry, + expected_ancestry, + ); + }); + + futures::executor::block_on(test_fut); + } + + #[test] + fn determine_new_block_before_finality_is_empty() { + let pool = TaskExecutor::new(); + let (mut ctx, _handle) = make_subsystem_context::<(), _>(pool.clone()); + + let chain = TestChain::new(10, 9); + + let head = chain.header_by_number(18).unwrap().clone(); + let head_hash = head.hash(); + let parent_hash = chain.hash_by_number(17).unwrap(); + let mut known = TestKnownBlocks::default(); + + known.insert(parent_hash); + + let test_fut = Box::pin(async move { + let after_finality = determine_new_blocks( + ctx.sender(), + |h| known.is_known(h), + head_hash, + &head, + 17, + ).await.unwrap(); + + let at_finality = determine_new_blocks( + ctx.sender(), + |h| known.is_known(h), + head_hash, + &head, + 18, + ).await.unwrap(); + + let before_finality = determine_new_blocks( + ctx.sender(), + |h| known.is_known(h), + head_hash, + &head, + 19, + ).await.unwrap(); + + assert_eq!( + after_finality, + vec![(head_hash, head.clone())], + ); + + assert_eq!( + at_finality, + Vec::new(), + ); + + assert_eq!( + before_finality, + Vec::new(), + ); + }); + + futures::executor::block_on(test_fut); + } +} diff --git a/polkadot/node/subsystem-util/src/lib.rs b/polkadot/node/subsystem-util/src/lib.rs index fac5cdc7b3..80ac5d9b2f 100644 --- a/polkadot/node/subsystem-util/src/lib.rs +++ b/polkadot/node/subsystem-util/src/lib.rs @@ -54,6 +54,7 @@ use thiserror::Error; pub use metered_channel as metered; pub use polkadot_node_network_protocol::MIN_GOSSIP_PEERS; +pub use determine_new_blocks::determine_new_blocks; /// Error classification. pub use error_handling::{Fault, unwrap_non_fatal}; @@ -72,6 +73,7 @@ pub mod runtime; /// A rolling session window cache. pub mod rolling_session_window; +mod determine_new_blocks; mod error_handling; #[cfg(test)]