Fix some problems with prove_warp_sync (#8037)

* Fix some problems with prove_warp_sync

* Update client/finality-grandpa/src/finality_proof.rs

Co-authored-by: cheme <emericchevalier.pro@gmail.com>

Co-authored-by: cheme <emericchevalier.pro@gmail.com>
This commit is contained in:
Ashley
2021-02-05 12:53:33 +01:00
committed by GitHub
parent 54def5f3d3
commit f78db6a778
2 changed files with 30 additions and 26 deletions
@@ -86,9 +86,9 @@ struct Request<B: BlockT> {
const WARP_SYNC_FRAGMENTS_LIMIT: usize = 100; const WARP_SYNC_FRAGMENTS_LIMIT: usize = 100;
/// Number of item with justification in warp sync cache. /// Number of item with justification in warp sync cache.
/// This should be customizable, setting a low number /// This should be customizable, but setting it to the max number of fragments
/// until then. /// we return seems like a good idea until then.
const WARP_SYNC_CACHE_SIZE: usize = 20; const WARP_SYNC_CACHE_SIZE: usize = WARP_SYNC_FRAGMENTS_LIMIT;
/// Handler for incoming grandpa warp sync requests from a remote peer. /// Handler for incoming grandpa warp sync requests from a remote peer.
pub struct GrandpaWarpSyncRequestHandler<TBackend, TBlock: BlockT> { pub struct GrandpaWarpSyncRequestHandler<TBackend, TBlock: BlockT> {
@@ -277,9 +277,9 @@ pub fn prove_warp_sync<Block: BlockT, B: BlockchainBackend<Block>>(
// This operation is a costy and only for the delay corner case. // This operation is a costy and only for the delay corner case.
while index > Zero::zero() { while index > Zero::zero() {
index = index - One::one(); index = index - One::one();
if let Some((fragement, apply_block)) = get_warp_sync_proof_fragment(blockchain, index, &mut cache)? { if let Some((fragment, apply_block)) = get_warp_sync_proof_fragment(blockchain, index, &mut cache)? {
if last_apply.map(|next| &next > header.number()).unwrap_or(false) { if last_apply.map(|next| &next > header.number()).unwrap_or(false) {
result.push(fragement); result.push(fragment);
last_apply = Some(apply_block); last_apply = Some(apply_block);
} else { } else {
break; break;
@@ -289,7 +289,7 @@ pub fn prove_warp_sync<Block: BlockT, B: BlockchainBackend<Block>>(
let mut index = *header.number(); let mut index = *header.number();
while index <= end_number { while index <= end_number {
if max_fragment_limit.map(|limit| result.len() <= limit).unwrap_or(false) { if max_fragment_limit.map(|limit| result.len() >= limit).unwrap_or(false) {
break; break;
} }
@@ -305,7 +305,10 @@ pub fn prove_warp_sync<Block: BlockT, B: BlockchainBackend<Block>>(
index = index + One::one(); index = index + One::one();
} }
if result.last().as_ref().map(|head| head.header.number()) != Some(&end_number) { let at_limit = max_fragment_limit.map(|limit| result.len() >= limit).unwrap_or(false);
// add last finalized block if reached and not already included.
if !at_limit && result.last().as_ref().map(|head| head.header.number()) != Some(&end_number) {
let header = blockchain.expect_header(end)?; let header = blockchain.expect_header(end)?;
if let Some(justification) = blockchain.justification(BlockId::Number(end_number.clone()))? { if let Some(justification) = blockchain.justification(BlockId::Number(end_number.clone()))? {
result.push(AuthoritySetProofFragment { result.push(AuthoritySetProofFragment {
@@ -328,7 +331,7 @@ fn get_warp_sync_proof_fragment<Block: BlockT, B: BlockchainBackend<Block>>(
) -> sp_blockchain::Result<Option<(AuthoritySetProofFragment<Block::Header>, NumberFor<Block>)>> { ) -> sp_blockchain::Result<Option<(AuthoritySetProofFragment<Block::Header>, NumberFor<Block>)>> {
if let Some(cache) = cache.as_mut() { if let Some(cache) = cache.as_mut() {
if let Some(result) = cache.get_item(index) { if let Some(result) = cache.get_item(index) {
return Ok(result.clone()); return Ok(result);
} }
} }
@@ -541,11 +544,11 @@ impl<Block: BlockT> BlockJustification<Block::Header> for GrandpaJustification<B
/// Simple cache for warp sync queries. /// Simple cache for warp sync queries.
pub struct WarpSyncFragmentCache<Header: HeaderT> { pub struct WarpSyncFragmentCache<Header: HeaderT> {
header_has_proof_fragment: std::collections::HashMap<Header::Number, bool>,
cache: linked_hash_map::LinkedHashMap< cache: linked_hash_map::LinkedHashMap<
Header::Number, Header::Number,
Option<(AuthoritySetProofFragment<Header>, Header::Number)>, (AuthoritySetProofFragment<Header>, Header::Number),
>, >,
headers_with_justification: usize,
limit: usize, limit: usize,
} }
@@ -553,8 +556,8 @@ impl<Header: HeaderT> WarpSyncFragmentCache<Header> {
/// Instantiate a new cache for the warp sync prover. /// Instantiate a new cache for the warp sync prover.
pub fn new(size: usize) -> Self { pub fn new(size: usize) -> Self {
WarpSyncFragmentCache { WarpSyncFragmentCache {
header_has_proof_fragment: Default::default(),
cache: Default::default(), cache: Default::default(),
headers_with_justification: 0,
limit: size, limit: size,
} }
} }
@@ -564,31 +567,32 @@ impl<Header: HeaderT> WarpSyncFragmentCache<Header> {
at: Header::Number, at: Header::Number,
item: Option<(AuthoritySetProofFragment<Header>, Header::Number)>, item: Option<(AuthoritySetProofFragment<Header>, Header::Number)>,
) { ) {
if self.cache.len() == self.limit { self.header_has_proof_fragment.insert(at, item.is_some());
self.pop_one();
if let Some(item) = item {
if self.cache.len() == self.limit {
self.pop_one();
}
self.cache.insert(at, item);
} }
if item.is_some() {
// we do not check previous value as cached value is always supposed to
// be queried before calling 'new_item'.
self.headers_with_justification += 1;
}
self.cache.insert(at, item);
} }
fn pop_one(&mut self) { fn pop_one(&mut self) {
while let Some(v) = self.cache.pop_front() { if let Some((header_number, _)) = self.cache.pop_front() {
if v.1.is_some() { self.header_has_proof_fragment.remove(&header_number);
self.headers_with_justification -= 1;
break;
}
} }
} }
fn get_item( fn get_item(
&mut self, &mut self,
block: Header::Number, block: Header::Number,
) -> Option<&mut Option<(AuthoritySetProofFragment<Header>, Header::Number)>> { ) -> Option<Option<(AuthoritySetProofFragment<Header>, Header::Number)>> {
self.cache.get_refresh(&block) match self.header_has_proof_fragment.get(&block) {
Some(true) => Some(self.cache.get_refresh(&block).cloned()),
Some(false) => Some(None),
None => None
}
} }
} }