// Copyright 2021 Parity Technologies (UK) Ltd. // This file is part of Polkadot. // Polkadot is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Polkadot is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . //! FuturesUndead: A `FuturesUnordered` with support for semi canceled futures. Those undead //! futures will still get polled, but will not count towards length. So length will only count //! futures, which are still considered live. //! //! Use case: If futures take longer than we would like them too, we may be able to request the data //! from somewhere else as well. We don't really want to cancel the old future, because maybe it //! was almost done, thus we would have wasted time with our impatience. By simply making them //! not count towards length, we can make sure to have enough "live" requests ongoing, while at the //! same time taking advantage of some maybe "late" response from the undead. //! use std::{ pin::Pin, task::{Context, Poll}, time::Duration, }; use futures::{future::BoxFuture, stream::FuturesUnordered, Future, Stream, StreamExt}; use polkadot_node_subsystem_util::TimeoutExt; /// FuturesUndead - `FuturesUnordered` with semi canceled (undead) futures. /// /// Limitations: Keeps track of undead futures by means of a counter, which is limited to 64 /// bits, so after `1.8*10^19` pushed futures, this implementation will panic. pub struct FuturesUndead { /// Actual `FuturesUnordered`. inner: FuturesUnordered>, /// Next sequence number to assign to the next future that gets pushed. next_sequence: SequenceNumber, /// Sequence number of first future considered live. first_live: Option, /// How many undead are there right now. undead: usize, } /// All futures get a number, to determine which are live. #[derive(Eq, PartialEq, Copy, Clone, Debug, PartialOrd)] struct SequenceNumber(usize); struct Undead { inner: BoxFuture<'static, Output>, our_sequence: SequenceNumber, } impl FuturesUndead { pub fn new() -> Self { Self { inner: FuturesUnordered::new(), next_sequence: SequenceNumber(0), first_live: None, undead: 0, } } pub fn push(&mut self, f: BoxFuture<'static, Output>) { self.inner.push(Undead { inner: f, our_sequence: self.next_sequence }); self.next_sequence.inc(); } /// Make all contained futures undead. /// /// They will no longer be counted on a call to `len`. pub fn soft_cancel(&mut self) { self.undead = self.inner.len(); self.first_live = Some(self.next_sequence); } /// Number of contained futures minus undead. pub fn len(&self) -> usize { self.inner.len() - self.undead } /// Total number of futures, including undead. pub fn total_len(&self) -> usize { self.inner.len() } /// Wait for next future to return with timeout. /// /// When timeout passes, return `None` and make all currently contained futures undead. pub async fn next_with_timeout(&mut self, timeout: Duration) -> Option { match self.next().timeout(timeout).await { // Timeout: None => { self.soft_cancel(); None }, Some(inner) => inner, } } } impl Stream for FuturesUndead { type Item = Output; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match self.inner.poll_next_unpin(cx) { Poll::Pending => Poll::Pending, Poll::Ready(None) => Poll::Ready(None), Poll::Ready(Some((sequence, v))) => { // Cleanup in case we became completely empty: if self.inner.len() == 0 { *self = Self::new(); return Poll::Ready(Some(v)) } let first_live = match self.first_live { None => return Poll::Ready(Some(v)), Some(first_live) => first_live, }; // An undead came back: if sequence < first_live { self.undead = self.undead.saturating_sub(1); } Poll::Ready(Some(v)) }, } } } impl SequenceNumber { pub fn inc(&mut self) { self.0 = self.0.checked_add(1).expect( "We don't expect an `UndeadFuture` to live long enough for 2^64 entries ever getting inserted." ); } } impl Future for Undead { type Output = (SequenceNumber, T); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match self.inner.as_mut().poll(cx) { Poll::Pending => Poll::Pending, Poll::Ready(v) => Poll::Ready((self.our_sequence, v)), } } } #[cfg(test)] mod tests { use super::*; use futures::{executor, pending, FutureExt}; #[test] fn cancel_sets_len_to_zero() { let mut undead = FuturesUndead::new(); undead.push((async { () }).boxed()); assert_eq!(undead.len(), 1); undead.soft_cancel(); assert_eq!(undead.len(), 0); } #[test] fn finished_undead_does_not_change_len() { executor::block_on(async { let mut undead = FuturesUndead::new(); undead.push(async { 1_i32 }.boxed()); undead.push(async { 2_i32 }.boxed()); assert_eq!(undead.len(), 2); undead.soft_cancel(); assert_eq!(undead.len(), 0); undead.push( async { pending!(); 0_i32 } .boxed(), ); undead.next().await; assert_eq!(undead.len(), 1); undead.push(async { 9_i32 }.boxed()); undead.soft_cancel(); assert_eq!(undead.len(), 0); }); } #[test] fn len_stays_correct_when_live_future_ends() { executor::block_on(async { let mut undead = FuturesUndead::new(); undead.push( async { pending!(); 1_i32 } .boxed(), ); undead.push( async { pending!(); 2_i32 } .boxed(), ); assert_eq!(undead.len(), 2); undead.soft_cancel(); assert_eq!(undead.len(), 0); undead.push(async { 0_i32 }.boxed()); undead.push(async { 1_i32 }.boxed()); undead.next().await; assert_eq!(undead.len(), 1); undead.next().await; assert_eq!(undead.len(), 0); undead.push(async { 9_i32 }.boxed()); assert_eq!(undead.len(), 1); }); } #[test] fn cleanup_works() { executor::block_on(async { let mut undead = FuturesUndead::new(); undead.push(async { 1_i32 }.boxed()); undead.soft_cancel(); undead.push(async { 2_i32 }.boxed()); undead.next().await; undead.next().await; assert_eq!(undead.first_live, None); }); } }