client/finality-granpda/until_imported: Rework pinning (#5983)

An `UntilImported` stream wraps a `Stream` of incoming messages and
waits for blocks those messages are based on before passing the messages
on.

The above `Stream` of incoming messages implements `Unpin`, thus there
is no need to use `pin_project` on the `UntilImported` struct. Instead
one only has to add the `Unpin` trait bound on the `I` trait parameter.
This commit is contained in:
Max Inden
2020-05-12 12:48:28 +02:00
committed by GitHub
parent 6013522ab0
commit 1ce85590ff
@@ -31,7 +31,7 @@ use super::{
use log::{debug, warn}; use log::{debug, warn};
use sp_utils::mpsc::TracingUnboundedReceiver; use sp_utils::mpsc::TracingUnboundedReceiver;
use futures::prelude::*; use futures::prelude::*;
use futures::stream::Fuse; use futures::stream::{Fuse, StreamExt};
use futures_timer::Delay; use futures_timer::Delay;
use finality_grandpa::voter; use finality_grandpa::voter;
use parking_lot::Mutex; use parking_lot::Mutex;
@@ -137,14 +137,16 @@ impl Drop for Metrics {
} }
} }
/// Buffering imported messages until blocks with given hashes are imported. /// Buffering incoming messages until blocks with given hashes are imported.
#[pin_project::pin_project] pub(crate) struct UntilImported<Block, BlockStatus, BlockSyncRequester, I, M> where
pub(crate) struct UntilImported<Block: BlockT, BlockStatus, BlockSyncRequester, I, M: BlockUntilImported<Block>> { Block: BlockT,
I: Stream<Item = M::Blocked> + Unpin,
M: BlockUntilImported<Block>,
{
import_notifications: Fuse<TracingUnboundedReceiver<BlockImportNotification<Block>>>, import_notifications: Fuse<TracingUnboundedReceiver<BlockImportNotification<Block>>>,
block_sync_requester: BlockSyncRequester, block_sync_requester: BlockSyncRequester,
status_check: BlockStatus, status_check: BlockStatus,
#[pin] incoming_messages: Fuse<I>,
inner: Fuse<I>,
ready: VecDeque<M::Blocked>, ready: VecDeque<M::Blocked>,
/// Interval at which to check status of each awaited block. /// Interval at which to check status of each awaited block.
check_pending: Pin<Box<dyn Stream<Item = Result<(), std::io::Error>> + Send + Sync>>, check_pending: Pin<Box<dyn Stream<Item = Result<(), std::io::Error>> + Send + Sync>>,
@@ -159,11 +161,17 @@ pub(crate) struct UntilImported<Block: BlockT, BlockStatus, BlockSyncRequester,
metrics: Option<Metrics>, metrics: Option<Metrics>,
} }
impl<Block, BlockStatus, BlockSyncRequester, I, M> Unpin for UntilImported<Block, BlockStatus, BlockSyncRequester, I, M > where
Block: BlockT,
I: Stream<Item = M::Blocked> + Unpin,
M: BlockUntilImported<Block>,
{}
impl<Block, BlockStatus, BlockSyncRequester, I, M> UntilImported<Block, BlockStatus, BlockSyncRequester, I, M> where impl<Block, BlockStatus, BlockSyncRequester, I, M> UntilImported<Block, BlockStatus, BlockSyncRequester, I, M> where
Block: BlockT, Block: BlockT,
BlockStatus: BlockStatusT<Block>, BlockStatus: BlockStatusT<Block>,
BlockSyncRequester: BlockSyncRequesterT<Block>, BlockSyncRequester: BlockSyncRequesterT<Block>,
I: Stream<Item = M::Blocked>, I: Stream<Item = M::Blocked> + Unpin,
M: BlockUntilImported<Block>, M: BlockUntilImported<Block>,
{ {
/// Create a new `UntilImported` wrapper. /// Create a new `UntilImported` wrapper.
@@ -171,7 +179,7 @@ impl<Block, BlockStatus, BlockSyncRequester, I, M> UntilImported<Block, BlockSta
import_notifications: ImportNotifications<Block>, import_notifications: ImportNotifications<Block>,
block_sync_requester: BlockSyncRequester, block_sync_requester: BlockSyncRequester,
status_check: BlockStatus, status_check: BlockStatus,
stream: I, incoming_messages: I,
identifier: &'static str, identifier: &'static str,
metrics: Option<Metrics>, metrics: Option<Metrics>,
) -> Self { ) -> Self {
@@ -192,7 +200,7 @@ impl<Block, BlockStatus, BlockSyncRequester, I, M> UntilImported<Block, BlockSta
import_notifications: import_notifications.fuse(), import_notifications: import_notifications.fuse(),
block_sync_requester, block_sync_requester,
status_check, status_check,
inner: stream.fuse(), incoming_messages: incoming_messages.fuse(),
ready: VecDeque::new(), ready: VecDeque::new(),
check_pending: Box::pin(check_pending), check_pending: Box::pin(check_pending),
pending: HashMap::new(), pending: HashMap::new(),
@@ -206,23 +214,23 @@ impl<Block, BStatus, BSyncRequester, I, M> Stream for UntilImported<Block, BStat
Block: BlockT, Block: BlockT,
BStatus: BlockStatusT<Block>, BStatus: BlockStatusT<Block>,
BSyncRequester: BlockSyncRequesterT<Block>, BSyncRequester: BlockSyncRequesterT<Block>,
I: Stream<Item = M::Blocked>, I: Stream<Item = M::Blocked> + Unpin,
M: BlockUntilImported<Block>, M: BlockUntilImported<Block>,
{ {
type Item = Result<M::Blocked, Error>; type Item = Result<M::Blocked, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
// We are using a `this` variable in order to allow multiple simultaneous mutable borrow // We are using a `this` variable in order to allow multiple simultaneous mutable borrow to
// to `self`. // `self`.
let mut this = self.project(); let this = &mut *self;
loop { loop {
match Stream::poll_next(Pin::new(&mut this.inner), cx) { match StreamExt::poll_next_unpin(&mut this.incoming_messages, cx) {
Poll::Ready(None) => return Poll::Ready(None), Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some(input)) => { Poll::Ready(Some(input)) => {
// new input: schedule wait of any parts which require // new input: schedule wait of any parts which require
// blocks to be known. // blocks to be known.
match M::needs_waiting(input, this.status_check)? { match M::needs_waiting(input, &this.status_check)? {
DiscardWaitOrReady::Discard => {}, DiscardWaitOrReady::Discard => {},
DiscardWaitOrReady::Wait(items) => { DiscardWaitOrReady::Wait(items) => {
for (target_hash, target_number, wait) in items { for (target_hash, target_number, wait) in items {
@@ -245,7 +253,7 @@ impl<Block, BStatus, BSyncRequester, I, M> Stream for UntilImported<Block, BStat
} }
loop { loop {
match Stream::poll_next(Pin::new(&mut this.import_notifications), cx) { match StreamExt::poll_next_unpin(&mut this.import_notifications, cx) {
Poll::Ready(None) => return Poll::Ready(None), Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some(notification)) => { Poll::Ready(Some(notification)) => {
// new block imported. queue up all messages tied to that hash. // new block imported. queue up all messages tied to that hash.
@@ -315,7 +323,7 @@ impl<Block, BStatus, BSyncRequester, I, M> Stream for UntilImported<Block, BStat
return Poll::Ready(Some(Ok(ready))) return Poll::Ready(Some(Ok(ready)))
} }
if this.import_notifications.is_done() && this.inner.is_done() { if this.import_notifications.is_done() && this.incoming_messages.is_done() {
Poll::Ready(None) Poll::Ready(None)
} else { } else {
Poll::Pending Poll::Pending