// Copyright 2021 Parity Technologies (UK) Ltd. // This file is part of Polkadot. // Polkadot is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Polkadot is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . use sp_runtime::traits::Block as BlockT; use polkadot_node_primitives::AvailableData; use polkadot_node_subsystem::messages::AvailabilityRecoveryMessage; use polkadot_overseer::OverseerHandler; use futures::{channel::oneshot, stream::FuturesUnordered, Future, FutureExt, StreamExt}; use std::{collections::HashSet, pin::Pin}; /// The active candidate recovery. /// /// This handles the candidate recovery and tracks the activate recoveries. pub(crate) struct ActiveCandidateRecovery { /// The recoveries that are currently being executed. recoveries: FuturesUnordered< Pin)> + Send>>, >, /// The block hashes of the candidates currently being recovered. candidates: HashSet, overseer_handler: OverseerHandler, } impl ActiveCandidateRecovery { pub fn new(overseer_handler: OverseerHandler) -> Self { Self { recoveries: Default::default(), candidates: Default::default(), overseer_handler, } } /// Recover the given `pending_candidate`. pub async fn recover_candidate( &mut self, block_hash: Block::Hash, pending_candidate: crate::PendingCandidate, ) { let (tx, rx) = oneshot::channel(); self.overseer_handler .send_msg(AvailabilityRecoveryMessage::RecoverAvailableData( pending_candidate.receipt, pending_candidate.session_index, None, tx, )) .await; self.candidates.insert(block_hash); self.recoveries.push( async move { match rx.await { Ok(Ok(res)) => (block_hash, Some(res)), Ok(Err(error)) => { tracing::debug!( target: crate::LOG_TARGET, ?error, ?block_hash, "Availability recovery failed", ); (block_hash, None) } Err(_) => { tracing::debug!( target: crate::LOG_TARGET, "Availability recovery oneshot channel closed", ); (block_hash, None) } } } .boxed(), ); } /// Returns if the given `candidate` is being recovered. pub fn is_being_recovered(&self, candidate: &Block::Hash) -> bool { self.candidates.contains(candidate) } /// Waits for the next recovery. /// /// If the returned [`AvailableData`] is `None`, it means that the recovery failed. pub async fn wait_for_recovery(&mut self) -> (Block::Hash, Option) { loop { if let Some(res) = self.recoveries.next().await { self.candidates.remove(&res.0); return res; } else { futures::pending!() } } } }