// Copyright 2021 Parity Technologies (UK) Ltd. // This file is part of Polkadot. // Polkadot is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Polkadot is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . //! A utility for fetching all unknown blocks based on a new chain-head hash. use polkadot_node_subsystem::{ messages::ChainApiMessage, SubsystemSender, SubsystemError, SubsystemResult, }; use polkadot_primitives::v1::{Hash, Header, BlockNumber}; use futures::prelude::*; use futures::channel::oneshot; /// Given a new chain-head hash, this determines the hashes of all new blocks we should track /// metadata for, given this head. /// /// This is guaranteed to be a subset of the (inclusive) ancestry of `head` determined as all /// blocks above the lower bound or above the highest known block, whichever is higher. /// This is formatted in descending order by block height. /// /// An implication of this is that if `head` itself is known or not above the lower bound, /// then the returned list will be empty. /// /// This may be somewhat expensive when first recovering from major sync. pub async fn determine_new_blocks( ctx: &mut impl SubsystemSender, is_known: impl Fn(&Hash) -> Result, head: Hash, header: &Header, lower_bound_number: BlockNumber, ) -> SubsystemResult> where SubsystemError: From { const ANCESTRY_STEP: usize = 4; // Early exit if the block is in the DB or too early. { let already_known = is_known(&head)?; let before_relevant = header.number <= lower_bound_number; if already_known || before_relevant { return Ok(Vec::new()); } } let mut ancestry = vec![(head, header.clone())]; // Early exit if the parent hash is in the DB. if is_known(&header.parent_hash)? { return Ok(ancestry); } 'outer: loop { let &(ref last_hash, ref last_header) = ancestry.last() .expect("ancestry has length 1 at initialization and is only added to; qed"); // If we iterated back to genesis, which can happen at the beginning of chains. if last_header.number <= 1 { break 'outer } let (tx, rx) = oneshot::channel(); ctx.send_message(ChainApiMessage::Ancestors { hash: *last_hash, k: ANCESTRY_STEP, response_channel: tx, }.into()).await; // Continue past these errors. let batch_hashes = match rx.await { Err(_) | Ok(Err(_)) => break 'outer, Ok(Ok(ancestors)) => ancestors, }; let batch_headers = { let (batch_senders, batch_receivers) = (0..batch_hashes.len()) .map(|_| oneshot::channel()) .unzip::<_, _, Vec<_>, Vec<_>>(); for (hash, sender) in batch_hashes.iter().cloned().zip(batch_senders) { ctx.send_message(ChainApiMessage::BlockHeader(hash, sender).into()).await; } let mut requests = futures::stream::FuturesOrdered::new(); batch_receivers.into_iter().map(|rx| async move { match rx.await { Err(_) | Ok(Err(_)) => None, Ok(Ok(h)) => h, } }) .for_each(|x| requests.push(x)); let batch_headers: Vec<_> = requests .flat_map(|x: Option
| stream::iter(x)) .collect() .await; // Any failed header fetch of the batch will yield a `None` result that will // be skipped. Any failure at this stage means we'll just ignore those blocks // as the chain DB has failed us. if batch_headers.len() != batch_hashes.len() { break 'outer } batch_headers }; for (hash, header) in batch_hashes.into_iter().zip(batch_headers) { let is_known = is_known(&hash)?; let is_relevant = header.number > lower_bound_number; if is_known || !is_relevant { break 'outer } ancestry.push((hash, header)); } } Ok(ancestry) } #[cfg(test)] mod tests { use super::*; use std::collections::{HashSet, HashMap}; use sp_core::testing::TaskExecutor; use polkadot_node_subsystem::{messages::AllMessages, SubsystemContext}; use polkadot_node_subsystem_test_helpers::make_subsystem_context; use assert_matches::assert_matches; #[derive(Default)] struct TestKnownBlocks { blocks: HashSet, } impl TestKnownBlocks { fn insert(&mut self, hash: Hash) { self.blocks.insert(hash); } fn is_known(&self, hash: &Hash) -> Result { Ok(self.blocks.contains(hash)) } } #[derive(Clone)] struct TestChain { start_number: BlockNumber, headers: Vec
, numbers: HashMap, } impl TestChain { fn new(start: BlockNumber, len: usize) -> Self { assert!(len > 0, "len must be at least 1"); let base = Header { digest: Default::default(), extrinsics_root: Default::default(), number: start, state_root: Default::default(), parent_hash: Default::default(), }; let base_hash = base.hash(); let mut chain = TestChain { start_number: start, headers: vec![base], numbers: vec![(base_hash, start)].into_iter().collect(), }; for _ in 1..len { chain.grow() } chain } fn grow(&mut self) { let next = { let last = self.headers.last().unwrap(); Header { digest: Default::default(), extrinsics_root: Default::default(), number: last.number + 1, state_root: Default::default(), parent_hash: last.hash(), } }; self.numbers.insert(next.hash(), next.number); self.headers.push(next); } fn header_by_number(&self, number: BlockNumber) -> Option<&Header> { if number < self.start_number { None } else { self.headers.get((number - self.start_number) as usize) } } fn header_by_hash(&self, hash: &Hash) -> Option<&Header> { self.numbers.get(hash).and_then(|n| self.header_by_number(*n)) } fn hash_by_number(&self, number: BlockNumber) -> Option { self.header_by_number(number).map(|h| h.hash()) } fn ancestry(&self, hash: &Hash, k: BlockNumber) -> Vec { let n = match self.numbers.get(hash) { None => return Vec::new(), Some(&n) => n, }; (0..k) .map(|i| i + 1) .filter_map(|i| self.header_by_number(n - i)) .map(|h| h.hash()) .collect() } } #[test] fn determine_new_blocks_back_to_lower_bound() { let pool = TaskExecutor::new(); let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone()); let known = TestKnownBlocks::default(); let chain = TestChain::new(10, 9); let head = chain.header_by_number(18).unwrap().clone(); let head_hash = head.hash(); let lower_bound_number = 12; // Finalized block should be omitted. The head provided to `determine_new_blocks` // should be included. let expected_ancestry = (13..=18) .map(|n| chain.header_by_number(n).map(|h| (h.hash(), h.clone())).unwrap()) .rev() .collect::>(); let test_fut = Box::pin(async move { let ancestry = determine_new_blocks( ctx.sender(), |h| known.is_known(h), head_hash, &head, lower_bound_number, ).await.unwrap(); assert_eq!( ancestry, expected_ancestry, ); }); let aux_fut = Box::pin(async move { assert_matches!( handle.recv().await, AllMessages::ChainApi(ChainApiMessage::Ancestors { hash: h, k, response_channel: tx, }) => { assert_eq!(h, head_hash); assert_eq!(k, 4); let _ = tx.send(Ok(chain.ancestry(&h, k as _))); } ); for _ in 0u32..4 { assert_matches!( handle.recv().await, AllMessages::ChainApi(ChainApiMessage::BlockHeader(h, tx)) => { let _ = tx.send(Ok(chain.header_by_hash(&h).map(|h| h.clone()))); } ); } assert_matches!( handle.recv().await, AllMessages::ChainApi(ChainApiMessage::Ancestors { hash: h, k, response_channel: tx, }) => { assert_eq!(h, chain.hash_by_number(14).unwrap()); assert_eq!(k, 4); let _ = tx.send(Ok(chain.ancestry(&h, k as _))); } ); for _ in 0..4 { assert_matches!( handle.recv().await, AllMessages::ChainApi(ChainApiMessage::BlockHeader(h, tx)) => { let _ = tx.send(Ok(chain.header_by_hash(&h).map(|h| h.clone()))); } ); } }); futures::executor::block_on(futures::future::join(test_fut, aux_fut)); } #[test] fn determine_new_blocks_back_to_known() { let pool = TaskExecutor::new(); let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone()); let mut known = TestKnownBlocks::default(); let chain = TestChain::new(10, 9); let head = chain.header_by_number(18).unwrap().clone(); let head_hash = head.hash(); let lower_bound_number = 12; let known_number = 15; let known_hash = chain.hash_by_number(known_number).unwrap(); known.insert(known_hash); // Known block should be omitted. The head provided to `determine_new_blocks` // should be included. let expected_ancestry = (16..=18) .map(|n| chain.header_by_number(n).map(|h| (h.hash(), h.clone())).unwrap()) .rev() .collect::>(); let test_fut = Box::pin(async move { let ancestry = determine_new_blocks( ctx.sender(), |h| known.is_known(h), head_hash, &head, lower_bound_number, ).await.unwrap(); assert_eq!( ancestry, expected_ancestry, ); }); let aux_fut = Box::pin(async move { assert_matches!( handle.recv().await, AllMessages::ChainApi(ChainApiMessage::Ancestors { hash: h, k, response_channel: tx, }) => { assert_eq!(h, head_hash); assert_eq!(k, 4); let _ = tx.send(Ok(chain.ancestry(&h, k as _))); } ); for _ in 0u32..4 { assert_matches!( handle.recv().await, AllMessages::ChainApi(ChainApiMessage::BlockHeader(h, tx)) => { let _ = tx.send(Ok(chain.header_by_hash(&h).map(|h| h.clone()))); } ); } }); futures::executor::block_on(futures::future::join(test_fut, aux_fut)); } #[test] fn determine_new_blocks_already_known_is_empty() { let pool = TaskExecutor::new(); let (mut ctx, _handle) = make_subsystem_context::<(), _>(pool.clone()); let mut known = TestKnownBlocks::default(); let chain = TestChain::new(10, 9); let head = chain.header_by_number(18).unwrap().clone(); let head_hash = head.hash(); let lower_bound_number = 0; known.insert(head_hash); // Known block should be omitted. let expected_ancestry = Vec::new(); let test_fut = Box::pin(async move { let ancestry = determine_new_blocks( ctx.sender(), |h| known.is_known(h), head_hash, &head, lower_bound_number, ).await.unwrap(); assert_eq!( ancestry, expected_ancestry, ); }); futures::executor::block_on(test_fut); } #[test] fn determine_new_blocks_parent_known_is_fast() { let pool = TaskExecutor::new(); let (mut ctx, _handle) = make_subsystem_context::<(), _>(pool.clone()); let mut known = TestKnownBlocks::default(); let chain = TestChain::new(10, 9); let head = chain.header_by_number(18).unwrap().clone(); let head_hash = head.hash(); let lower_bound_number = 0; let parent_hash = chain.hash_by_number(17).unwrap(); known.insert(parent_hash); // New block should be the only new one. let expected_ancestry = vec![(head_hash, head.clone())]; let test_fut = Box::pin(async move { let ancestry = determine_new_blocks( ctx.sender(), |h| known.is_known(h), head_hash, &head, lower_bound_number, ).await.unwrap(); assert_eq!( ancestry, expected_ancestry, ); }); futures::executor::block_on(test_fut); } #[test] fn determine_new_block_before_finality_is_empty() { let pool = TaskExecutor::new(); let (mut ctx, _handle) = make_subsystem_context::<(), _>(pool.clone()); let chain = TestChain::new(10, 9); let head = chain.header_by_number(18).unwrap().clone(); let head_hash = head.hash(); let parent_hash = chain.hash_by_number(17).unwrap(); let mut known = TestKnownBlocks::default(); known.insert(parent_hash); let test_fut = Box::pin(async move { let after_finality = determine_new_blocks( ctx.sender(), |h| known.is_known(h), head_hash, &head, 17, ).await.unwrap(); let at_finality = determine_new_blocks( ctx.sender(), |h| known.is_known(h), head_hash, &head, 18, ).await.unwrap(); let before_finality = determine_new_blocks( ctx.sender(), |h| known.is_known(h), head_hash, &head, 19, ).await.unwrap(); assert_eq!( after_finality, vec![(head_hash, head.clone())], ); assert_eq!( at_finality, Vec::new(), ); assert_eq!( before_finality, Vec::new(), ); }); futures::executor::block_on(test_fut); } }