client/finality-grandpa/src/until_imported: Refactor schedule_wait (#5386)

* client/finality-grandpa/src/until_imported: Refactor schedule_wait

Previously `BlockUntilImported::schedule_wait` took two closures, one to
report ready items and one to report items to await. None of the implementors of
`BlockUntilImported` call both closures. From a symantic perspective it
would as well not make sense to both await something and state that it
is ready.

Instead with this commit `BlockUntilImported::schedule_wait` simply
returns whether the given item needs waiting or is ready to be passed
on.

This reduces complexity by:

- Removing side effects through the two closures.

- Reducing borrowing given that `UntilImported` `ready` and `pending`
don't need to be borrowed from within the two closures.

- Removes the need for trait bounds for the two closures.

* client/finality-grandpa/src/until_imported: Fix comments

Co-Authored-By: André Silva <andre.beat@gmail.com>

Co-authored-by: André Silva <andre.beat@gmail.com>
This commit is contained in:
Max Inden
2020-03-26 13:49:45 +01:00
committed by GitHub
parent eb135ccb56
commit 2a67e6c437
@@ -47,30 +47,36 @@ use sp_finality_grandpa::AuthorityId;
const LOG_PENDING_INTERVAL: Duration = Duration::from_secs(15); const LOG_PENDING_INTERVAL: Duration = Duration::from_secs(15);
// something which will block until imported. /// Something that needs to be withheld until specific blocks are available.
///
/// For example a GRANDPA commit message which is not of any use without the corresponding block
/// that it commits on.
pub(crate) trait BlockUntilImported<Block: BlockT>: Sized { pub(crate) trait BlockUntilImported<Block: BlockT>: Sized {
// the type that is blocked on. /// The type that is blocked on.
type Blocked; type Blocked;
/// new incoming item. For all internal items, /// Check if a new incoming item needs awaiting until a block(s) is imported.
/// check if they require to be waited for. fn needs_waiting<S: BlockStatusT<Block>>(
/// if so, call the `Wait` closure.
/// if they are ready, call the `Ready` closure.
fn schedule_wait<S, Wait, Ready>(
input: Self::Blocked, input: Self::Blocked,
status_check: &S, status_check: &S,
wait: Wait, ) -> Result<DiscardWaitOrReady<Block, Self, Self::Blocked>, Error>;
ready: Ready,
) -> Result<(), Error> where
S: BlockStatusT<Block>,
Wait: FnMut(Block::Hash, NumberFor<Block>, Self),
Ready: FnMut(Self::Blocked);
/// called when the wait has completed. The canonical number is passed through /// called when the wait has completed. The canonical number is passed through
/// for further checks. /// for further checks.
fn wait_completed(self, canon_number: NumberFor<Block>) -> Option<Self::Blocked>; fn wait_completed(self, canon_number: NumberFor<Block>) -> Option<Self::Blocked>;
} }
/// Describes whether a given [`BlockUntilImported`] (a) should be discarded, (b) is waiting for
/// specific blocks to be imported or (c) is ready to be used.
///
/// A reason for discarding a [`BlockUntilImported`] would be if a referenced block is perceived
/// under a different number than specified in the message.
pub(crate) enum DiscardWaitOrReady<Block: BlockT, W, R> {
Discard,
Wait(Vec<(Block::Hash, NumberFor<Block>, W)>),
Ready(R),
}
/// Buffering imported messages until blocks with given hashes are imported. /// Buffering imported messages until blocks with given hashes are imported.
#[pin_project::pin_project] #[pin_project::pin_project]
pub(crate) struct UntilImported<Block: BlockT, BlockStatus, BlockSyncRequester, I, M: BlockUntilImported<Block>> { pub(crate) struct UntilImported<Block: BlockT, BlockStatus, BlockSyncRequester, I, M: BlockUntilImported<Block>> {
@@ -149,18 +155,19 @@ impl<Block, BStatus, BSyncRequester, I, M> Stream for UntilImported<Block, BStat
Poll::Ready(Some(input)) => { Poll::Ready(Some(input)) => {
// new input: schedule wait of any parts which require // new input: schedule wait of any parts which require
// blocks to be known. // blocks to be known.
let ready = &mut this.ready; match M::needs_waiting(input, this.status_check)? {
let pending = &mut this.pending; DiscardWaitOrReady::Discard => {},
M::schedule_wait( DiscardWaitOrReady::Wait(items) => {
input, for (target_hash, target_number, wait) in items {
this.status_check, this.pending
|target_hash, target_number, wait| pending .entry(target_hash)
.entry(target_hash) .or_insert_with(|| (target_number, Instant::now(), Vec::new()))
.or_insert_with(|| (target_number, Instant::now(), Vec::new())) .2
.2 .push(wait)
.push(wait), }
|ready_item| ready.push_back(ready_item), },
)?; DiscardWaitOrReady::Ready(item) => this.ready.push_back(item),
}
} }
Poll::Pending => break, Poll::Pending => break,
} }
@@ -255,29 +262,22 @@ fn warn_authority_wrong_target<H: ::std::fmt::Display>(hash: H, id: AuthorityId)
impl<Block: BlockT> BlockUntilImported<Block> for SignedMessage<Block> { impl<Block: BlockT> BlockUntilImported<Block> for SignedMessage<Block> {
type Blocked = Self; type Blocked = Self;
fn schedule_wait<BlockStatus, Wait, Ready>( fn needs_waiting<BlockStatus: BlockStatusT<Block>>(
msg: Self::Blocked, msg: Self::Blocked,
status_check: &BlockStatus, status_check: &BlockStatus,
mut wait: Wait, ) -> Result<DiscardWaitOrReady<Block, Self, Self::Blocked>, Error> {
mut ready: Ready,
) -> Result<(), Error> where
BlockStatus: BlockStatusT<Block>,
Wait: FnMut(Block::Hash, NumberFor<Block>, Self),
Ready: FnMut(Self::Blocked),
{
let (&target_hash, target_number) = msg.target(); let (&target_hash, target_number) = msg.target();
if let Some(number) = status_check.block_number(target_hash)? { if let Some(number) = status_check.block_number(target_hash)? {
if number != target_number { if number != target_number {
warn_authority_wrong_target(target_hash, msg.id); warn_authority_wrong_target(target_hash, msg.id);
return Ok(DiscardWaitOrReady::Discard);
} else { } else {
ready(msg); return Ok(DiscardWaitOrReady::Ready(msg));
} }
} else {
wait(target_hash, target_number, msg)
} }
Ok(()) return Ok(DiscardWaitOrReady::Wait(vec![(target_hash, target_number, msg)]))
} }
fn wait_completed(self, canon_number: NumberFor<Block>) -> Option<Self::Blocked> { fn wait_completed(self, canon_number: NumberFor<Block>) -> Option<Self::Blocked> {
@@ -321,16 +321,10 @@ impl<Block: BlockT> Unpin for BlockGlobalMessage<Block> {}
impl<Block: BlockT> BlockUntilImported<Block> for BlockGlobalMessage<Block> { impl<Block: BlockT> BlockUntilImported<Block> for BlockGlobalMessage<Block> {
type Blocked = CommunicationIn<Block>; type Blocked = CommunicationIn<Block>;
fn schedule_wait<BlockStatus, Wait, Ready>( fn needs_waiting<BlockStatus: BlockStatusT<Block>>(
input: Self::Blocked, input: Self::Blocked,
status_check: &BlockStatus, status_check: &BlockStatus,
mut wait: Wait, ) -> Result<DiscardWaitOrReady<Block, Self, Self::Blocked>, Error> {
mut ready: Ready,
) -> Result<(), Error> where
BlockStatus: BlockStatusT<Block>,
Wait: FnMut(Block::Hash, NumberFor<Block>, Self),
Ready: FnMut(Self::Blocked),
{
use std::collections::hash_map::Entry; use std::collections::hash_map::Entry;
enum KnownOrUnknown<N> { enum KnownOrUnknown<N> {
@@ -348,7 +342,6 @@ impl<Block: BlockT> BlockUntilImported<Block> for BlockGlobalMessage<Block> {
} }
let mut checked_hashes: HashMap<_, KnownOrUnknown<NumberFor<Block>>> = HashMap::new(); let mut checked_hashes: HashMap<_, KnownOrUnknown<NumberFor<Block>>> = HashMap::new();
let mut unknown_count = 0;
{ {
// returns false when should early exit. // returns false when should early exit.
@@ -363,7 +356,6 @@ impl<Block: BlockT> BlockUntilImported<Block> for BlockGlobalMessage<Block> {
} else { } else {
entry.insert(KnownOrUnknown::Unknown(perceived_number)); entry.insert(KnownOrUnknown::Unknown(perceived_number));
unknown_count += 1;
perceived_number perceived_number
} }
} }
@@ -388,7 +380,7 @@ impl<Block: BlockT> BlockUntilImported<Block> for BlockGlobalMessage<Block> {
for (target_number, target_hash) in precommit_targets { for (target_number, target_hash) in precommit_targets {
if !query_known(target_hash, target_number)? { if !query_known(target_hash, target_number)? {
return Ok(()) return Ok(DiscardWaitOrReady::Discard);
} }
} }
}, },
@@ -406,38 +398,34 @@ impl<Block: BlockT> BlockUntilImported<Block> for BlockGlobalMessage<Block> {
for (target_number, target_hash) in targets { for (target_number, target_hash) in targets {
if !query_known(target_hash, target_number)? { if !query_known(target_hash, target_number)? {
return Ok(()) return Ok(DiscardWaitOrReady::Discard);
} }
} }
}, },
}; };
} }
// none of the hashes in the global message were unknown. let unknown_hashes = checked_hashes.into_iter().filter_map(|(hash, num)| match num {
// we can just return the message directly. KnownOrUnknown::Unknown(number) => Some((hash, number)),
if unknown_count == 0 { KnownOrUnknown::Known(_) => None,
ready(input); }).collect::<Vec<_>>();
return Ok(())
if unknown_hashes.is_empty() {
// none of the hashes in the global message were unknown.
// we can just return the message directly.
return Ok(DiscardWaitOrReady::Ready(input));
} }
let locked_global = Arc::new(Mutex::new(Some(input))); let locked_global = Arc::new(Mutex::new(Some(input)));
let items_to_await = unknown_hashes.into_iter().map(|(hash, target_number)| {
(hash, target_number, BlockGlobalMessage { inner: locked_global.clone(), target_number })
}).collect();
// schedule waits for all unknown messages. // schedule waits for all unknown messages.
// when the last one of these has `wait_completed` called on it, // when the last one of these has `wait_completed` called on it,
// the global message will be returned. // the global message will be returned.
// Ok(DiscardWaitOrReady::Wait(items_to_await))
// in the future, we may want to issue sync requests to the network
// if this is taking a long time.
for (hash, is_known) in checked_hashes {
if let KnownOrUnknown::Unknown(target_number) = is_known {
wait(hash, target_number, BlockGlobalMessage {
inner: locked_global.clone(),
target_number,
})
}
}
Ok(())
} }
fn wait_completed(self, canon_number: NumberFor<Block>) -> Option<Self::Blocked> { fn wait_completed(self, canon_number: NumberFor<Block>) -> Option<Self::Blocked> {