mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-30 04:41:03 +00:00
Pov recovery for parachains (#445)
* Start with a failing integration test & some refactorings * More work * Make it "work" * Add `NullConsensus` for the test * More refactorings * Move stuff over to its own crate * Refactorings * Integrate it into `service` and make the test working * Docs and some exit condition * Use the real import queue * Fix tests * Update client/pov-recovery/src/active_candidate_recovery.rs Co-authored-by: Bernhard Schuster <bernhard@ahoi.io> * Fetch slot duration from the relay chain * Docs * Fixes Co-authored-by: Bernhard Schuster <bernhard@ahoi.io>
This commit is contained in:
@@ -0,0 +1,53 @@
|
||||
[package]
|
||||
name = "cumulus-client-pov-recovery"
|
||||
version = "0.1.0"
|
||||
authors = ["Parity Technologies <admin@parity.io>"]
|
||||
description = "Cumulus-specific networking protocol"
|
||||
edition = "2018"
|
||||
|
||||
[dependencies]
|
||||
# Substrate deps
|
||||
sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
sp-maybe-compressed-blob = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
|
||||
# Polkadot deps
|
||||
polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "master" }
|
||||
polkadot-statement-table = { git = "https://github.com/paritytech/polkadot", branch = "master" }
|
||||
polkadot-node-primitives = { git = "https://github.com/paritytech/polkadot", branch = "master" }
|
||||
polkadot-service = { git = "https://github.com/paritytech/polkadot", branch = "master" }
|
||||
polkadot-parachain = { git = "https://github.com/paritytech/polkadot", branch = "master" }
|
||||
polkadot-overseer = { git = "https://github.com/paritytech/polkadot", branch = "master" }
|
||||
polkadot-node-subsystem = { git = "https://github.com/paritytech/polkadot", branch = "master" }
|
||||
|
||||
# Cumulus deps
|
||||
cumulus-primitives-core = { path = "../../primitives/core" }
|
||||
|
||||
# other deps
|
||||
codec = { package = "parity-scale-codec", version = "2.0.0", features = [ "derive" ] }
|
||||
futures = { version = "0.3.1", features = ["compat"] }
|
||||
futures-timer = "3.0.2"
|
||||
tracing = "0.1.22"
|
||||
rand = "0.8.3"
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { version = "0.2.21", features = ["macros"] }
|
||||
|
||||
# Cumulus deps
|
||||
cumulus-test-service = { path = "../../test/service" }
|
||||
|
||||
# Polkadot deps
|
||||
polkadot-test-client = { git = "https://github.com/paritytech/polkadot", branch = "master" }
|
||||
|
||||
# substrate deps
|
||||
sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
sc-cli = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
substrate-test-utils = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
sc-service = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
@@ -0,0 +1,112 @@
|
||||
// Copyright 2021 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Polkadot.
|
||||
|
||||
// Polkadot is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Polkadot is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use sp_runtime::traits::Block as BlockT;
|
||||
|
||||
use polkadot_node_primitives::AvailableData;
|
||||
use polkadot_node_subsystem::messages::AvailabilityRecoveryMessage;
|
||||
use polkadot_overseer::OverseerHandler;
|
||||
|
||||
use futures::{channel::oneshot, stream::FuturesUnordered, Future, FutureExt, StreamExt};
|
||||
|
||||
use std::{collections::HashSet, pin::Pin};
|
||||
|
||||
/// The active candidate recovery.
|
||||
///
|
||||
/// This handles the candidate recovery and tracks the activate recoveries.
|
||||
pub(crate) struct ActiveCandidateRecovery<Block: BlockT> {
|
||||
/// The recoveries that are currently being executed.
|
||||
recoveries: FuturesUnordered<
|
||||
Pin<Box<dyn Future<Output = (Block::Hash, Option<AvailableData>)> + Send>>,
|
||||
>,
|
||||
/// The block hashes of the candidates currently being recovered.
|
||||
candidates: HashSet<Block::Hash>,
|
||||
overseer_handler: OverseerHandler,
|
||||
}
|
||||
|
||||
impl<Block: BlockT> ActiveCandidateRecovery<Block> {
|
||||
pub fn new(overseer_handler: OverseerHandler) -> Self {
|
||||
Self {
|
||||
recoveries: Default::default(),
|
||||
candidates: Default::default(),
|
||||
overseer_handler,
|
||||
}
|
||||
}
|
||||
|
||||
/// Recover the given `pending_candidate`.
|
||||
pub async fn recover_candidate(
|
||||
&mut self,
|
||||
block_hash: Block::Hash,
|
||||
pending_candidate: crate::PendingCandidate<Block>,
|
||||
) {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
self.overseer_handler
|
||||
.send_msg(AvailabilityRecoveryMessage::RecoverAvailableData(
|
||||
pending_candidate.receipt,
|
||||
pending_candidate.session_index,
|
||||
None,
|
||||
tx,
|
||||
))
|
||||
.await;
|
||||
|
||||
self.candidates.insert(block_hash);
|
||||
|
||||
self.recoveries.push(
|
||||
async move {
|
||||
match rx.await {
|
||||
Ok(Ok(res)) => (block_hash, Some(res)),
|
||||
Ok(Err(error)) => {
|
||||
tracing::debug!(
|
||||
target: crate::LOG_TARGET,
|
||||
?error,
|
||||
?block_hash,
|
||||
"Availability recovery failed",
|
||||
);
|
||||
(block_hash, None)
|
||||
}
|
||||
Err(_) => {
|
||||
tracing::debug!(
|
||||
target: crate::LOG_TARGET,
|
||||
"Availability recovery oneshot channel closed",
|
||||
);
|
||||
(block_hash, None)
|
||||
}
|
||||
}
|
||||
}
|
||||
.boxed(),
|
||||
);
|
||||
}
|
||||
|
||||
/// Returns if the given `candidate` is being recovered.
|
||||
pub fn is_being_recovered(&self, candidate: &Block::Hash) -> bool {
|
||||
self.candidates.contains(candidate)
|
||||
}
|
||||
|
||||
/// Waits for the next recovery.
|
||||
///
|
||||
/// If the returned [`AvailableData`] is `None`, it means that the recovery failed.
|
||||
pub async fn wait_for_recovery(&mut self) -> (Block::Hash, Option<AvailableData>) {
|
||||
loop {
|
||||
if let Some(res) = self.recoveries.next().await {
|
||||
self.candidates.remove(&res.0);
|
||||
return res;
|
||||
} else {
|
||||
futures::pending!()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,456 @@
|
||||
// Copyright 2021 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Cumulus.
|
||||
|
||||
// Cumulus is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Polkadot is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//! Parachain PoV recovery
|
||||
//!
|
||||
//! A parachain needs to build PoVs that are send to the relay chain to progress. These PoVs are
|
||||
//! erasure encoded and one piece of it is stored by each relay chain validator. As the relay chain
|
||||
//! decides on which PoV per parachain to include and thus, to progess the parachain it can happen
|
||||
//! that the block corresponding to this PoV isn't propagated in the parachain network. This can have
|
||||
//! several reasons, either a malicious collator that managed to include its own PoV and doesn't want
|
||||
//! to share it with the rest of the network or maybe a collator went down before it could distribute
|
||||
//! the block in the network. When something like this happens we can use the PoV recovery algorithm
|
||||
//! implemented in this crate to recover a PoV and to propagate it with the rest of the network. This
|
||||
//! protocol is only executed by the collators, to not overwhelm the relay chain validators.
|
||||
//!
|
||||
//! It works in the following way:
|
||||
//!
|
||||
//! 1. For every included relay chain block we note the backed candidate of our parachain. If the
|
||||
//! block belonging to the PoV is already known, we do nothing. Otherwise we start
|
||||
//! a timer that waits a random time between 0..relay_chain_slot_length before starting to recover
|
||||
//! the PoV.
|
||||
//!
|
||||
//! 2. If between starting and firing the timer the block is imported, we skip the recovery of the
|
||||
//! PoV.
|
||||
//!
|
||||
//! 3. If the timer fired we recover the PoV using the relay chain PoV recovery protocol. After it
|
||||
//! is recovered, we restore the block and import it.
|
||||
//!
|
||||
//! If we need to recover multiple PoV blocks (which should hopefully not happen in real life), we
|
||||
//! make sure that the blocks are imported in the correct order.
|
||||
|
||||
use sc_client_api::{BlockBackend, BlockchainEvents, UsageProvider};
|
||||
use sp_api::ProvideRuntimeApi;
|
||||
use sp_consensus::{
|
||||
import_queue::{ImportQueue, IncomingBlock},
|
||||
BlockOrigin, BlockStatus,
|
||||
};
|
||||
use sp_runtime::{
|
||||
generic::BlockId,
|
||||
traits::{Block as BlockT, Header as HeaderT, NumberFor},
|
||||
};
|
||||
|
||||
use polkadot_node_primitives::{AvailableData, POV_BOMB_LIMIT};
|
||||
use polkadot_overseer::OverseerHandler;
|
||||
use polkadot_primitives::v1::{
|
||||
Block as PBlock, CandidateReceipt, CommittedCandidateReceipt, Id as ParaId, ParachainHost,
|
||||
SessionIndex,
|
||||
};
|
||||
|
||||
use cumulus_primitives_core::ParachainBlockData;
|
||||
|
||||
use codec::Decode;
|
||||
use futures::{select, stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt};
|
||||
use futures_timer::Delay;
|
||||
use rand::{thread_rng, Rng};
|
||||
|
||||
use std::{
|
||||
collections::{HashMap, VecDeque},
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
mod active_candidate_recovery;
|
||||
use active_candidate_recovery::ActiveCandidateRecovery;
|
||||
|
||||
const LOG_TARGET: &str = "cumulus-pov-recovery";
|
||||
|
||||
/// Represents a pending candidate.
|
||||
struct PendingCandidate<Block: BlockT> {
|
||||
receipt: CandidateReceipt,
|
||||
session_index: SessionIndex,
|
||||
block_number: NumberFor<Block>,
|
||||
}
|
||||
|
||||
/// Encapsulates the logic of the pov recovery.
|
||||
pub struct PoVRecovery<Block: BlockT, PC, IQ, RC> {
|
||||
/// All the pending candidates that we are waiting for to be imported or that need to be
|
||||
/// recovered when `next_candidate_to_recover` tells us to do so.
|
||||
pending_candidates: HashMap<Block::Hash, PendingCandidate<Block>>,
|
||||
/// A stream of futures that resolve to hashes of candidates that need to be recovered.
|
||||
///
|
||||
/// The candidates to the hashes are stored in `pending_candidates`. If a candidate is not
|
||||
/// available anymore in this map, it means that it was already imported.
|
||||
next_candidate_to_recover: FuturesUnordered<Pin<Box<dyn Future<Output = Block::Hash> + Send>>>,
|
||||
active_candidate_recovery: ActiveCandidateRecovery<Block>,
|
||||
/// Blocks that wait that the parent is imported.
|
||||
///
|
||||
/// Uses parent -> blocks mapping.
|
||||
waiting_for_parent: HashMap<Block::Hash, Vec<Block>>,
|
||||
relay_chain_slot_duration: Duration,
|
||||
parachain_client: Arc<PC>,
|
||||
parachain_import_queue: IQ,
|
||||
relay_chain_client: Arc<RC>,
|
||||
para_id: ParaId,
|
||||
}
|
||||
|
||||
impl<Block: BlockT, PC, IQ, RC> PoVRecovery<Block, PC, IQ, RC>
|
||||
where
|
||||
PC: BlockBackend<Block> + BlockchainEvents<Block> + UsageProvider<Block>,
|
||||
RC: ProvideRuntimeApi<PBlock> + BlockchainEvents<PBlock>,
|
||||
RC::Api: ParachainHost<PBlock>,
|
||||
IQ: ImportQueue<Block>,
|
||||
{
|
||||
/// Create a new instance.
|
||||
pub fn new(
|
||||
overseer_handler: OverseerHandler,
|
||||
relay_chain_slot_duration: Duration,
|
||||
parachain_client: Arc<PC>,
|
||||
parachain_import_queue: IQ,
|
||||
relay_chain_client: Arc<RC>,
|
||||
para_id: ParaId,
|
||||
) -> Self {
|
||||
Self {
|
||||
pending_candidates: HashMap::new(),
|
||||
next_candidate_to_recover: Default::default(),
|
||||
active_candidate_recovery: ActiveCandidateRecovery::new(overseer_handler),
|
||||
relay_chain_slot_duration,
|
||||
waiting_for_parent: HashMap::new(),
|
||||
parachain_client,
|
||||
parachain_import_queue,
|
||||
relay_chain_client,
|
||||
para_id,
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle a new pending candidate.
|
||||
fn handle_pending_candidate(
|
||||
&mut self,
|
||||
receipt: CommittedCandidateReceipt,
|
||||
session_index: SessionIndex,
|
||||
) {
|
||||
let header = match Block::Header::decode(&mut &receipt.commitments.head_data.0[..]) {
|
||||
Ok(header) => header,
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
error = ?e,
|
||||
"Failed to decode parachain header from pending candidate",
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if *header.number() <= self.parachain_client.usage_info().chain.finalized_number {
|
||||
return;
|
||||
}
|
||||
|
||||
let hash = header.hash();
|
||||
match self.parachain_client.block_status(&BlockId::Hash(hash)) {
|
||||
Ok(BlockStatus::Unknown) => (),
|
||||
// Any other state means, we should ignore it.
|
||||
Ok(_) => return,
|
||||
Err(e) => {
|
||||
tracing::debug!(
|
||||
target: "cumulus-consensus",
|
||||
error = ?e,
|
||||
block_hash = ?hash,
|
||||
"Failed to get block status",
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if self
|
||||
.pending_candidates
|
||||
.insert(
|
||||
hash,
|
||||
PendingCandidate {
|
||||
block_number: *header.number(),
|
||||
receipt: receipt.to_plain(),
|
||||
session_index,
|
||||
},
|
||||
)
|
||||
.is_some()
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
// Wait some random time, with the maximum being the slot duration of the relay chain
|
||||
// before we start to recover the candidate.
|
||||
let delay = Delay::new(self.relay_chain_slot_duration.mul_f64(thread_rng().gen()));
|
||||
self.next_candidate_to_recover.push(
|
||||
async move {
|
||||
delay.await;
|
||||
hash
|
||||
}
|
||||
.boxed(),
|
||||
);
|
||||
}
|
||||
|
||||
/// Handle an imported block.
|
||||
fn handle_block_imported(&mut self, hash: &Block::Hash) {
|
||||
self.pending_candidates.remove(&hash);
|
||||
}
|
||||
|
||||
/// Handle a finalized block with the given `block_number`.
|
||||
fn handle_block_finalized(&mut self, block_number: NumberFor<Block>) {
|
||||
self.pending_candidates
|
||||
.retain(|_, pc| pc.block_number > block_number);
|
||||
}
|
||||
|
||||
/// Recover the candidate for the given `block_hash`.
|
||||
async fn recover_candidate(&mut self, block_hash: Block::Hash) {
|
||||
let pending_candidate = match self.pending_candidates.remove(&block_hash) {
|
||||
Some(pending_candidate) => pending_candidate,
|
||||
None => return,
|
||||
};
|
||||
|
||||
self.active_candidate_recovery
|
||||
.recover_candidate(block_hash, pending_candidate)
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Clear `waiting_for_parent` from the given `hash` and do this recursively for all child
|
||||
/// blocks.
|
||||
fn clear_waiting_for_parent(&mut self, hash: Block::Hash) {
|
||||
let mut blocks_to_delete = vec![hash];
|
||||
|
||||
while let Some(delete) = blocks_to_delete.pop() {
|
||||
if let Some(childs) = self.waiting_for_parent.remove(&delete) {
|
||||
blocks_to_delete.extend(childs.iter().map(BlockT::hash));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle a recovered candidate.
|
||||
async fn handle_candidate_recovered(
|
||||
&mut self,
|
||||
block_hash: Block::Hash,
|
||||
available_data: Option<AvailableData>,
|
||||
) {
|
||||
let available_data = match available_data {
|
||||
Some(data) => data,
|
||||
None => {
|
||||
self.clear_waiting_for_parent(block_hash);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let raw_block_data = match sp_maybe_compressed_blob::decompress(
|
||||
&available_data.pov.block_data.0,
|
||||
POV_BOMB_LIMIT,
|
||||
) {
|
||||
Ok(r) => r,
|
||||
Err(error) => {
|
||||
tracing::debug!(target: LOG_TARGET, ?error, "Failed to decompress PoV");
|
||||
|
||||
self.clear_waiting_for_parent(block_hash);
|
||||
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let block_data = match ParachainBlockData::<Block>::decode(&mut &raw_block_data[..]) {
|
||||
Ok(d) => d,
|
||||
Err(error) => {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
?error,
|
||||
"Failed to decode parachain block data from recovered PoV",
|
||||
);
|
||||
|
||||
self.clear_waiting_for_parent(block_hash);
|
||||
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let block = block_data.into_block();
|
||||
|
||||
let parent = *block.header().parent_hash();
|
||||
|
||||
match self.parachain_client.block_status(&BlockId::hash(parent)) {
|
||||
Ok(BlockStatus::Unknown) => {
|
||||
if self.active_candidate_recovery.is_being_recovered(&parent) {
|
||||
tracing::debug!(
|
||||
target: "cumulus-consensus",
|
||||
?block_hash,
|
||||
parent_hash = ?parent,
|
||||
"Parent is still being recovered, waiting.",
|
||||
);
|
||||
|
||||
self.waiting_for_parent
|
||||
.entry(parent)
|
||||
.or_default()
|
||||
.push(block);
|
||||
return;
|
||||
} else {
|
||||
tracing::debug!(
|
||||
target: "cumulus-consensus",
|
||||
?block_hash,
|
||||
parent_hash = ?parent,
|
||||
"Parent not found while trying to import recovered block.",
|
||||
);
|
||||
|
||||
self.clear_waiting_for_parent(block_hash);
|
||||
|
||||
return;
|
||||
}
|
||||
}
|
||||
Err(error) => {
|
||||
tracing::debug!(
|
||||
target: "cumulus-consensus",
|
||||
block_hash = ?parent,
|
||||
?error,
|
||||
"Error while checking block status",
|
||||
);
|
||||
|
||||
self.clear_waiting_for_parent(block_hash);
|
||||
|
||||
return;
|
||||
}
|
||||
// Any other status is fine to "ignore/accept"
|
||||
_ => (),
|
||||
}
|
||||
|
||||
self.import_block(block).await;
|
||||
}
|
||||
|
||||
/// Import the given `block`.
|
||||
///
|
||||
/// This will also recursivley drain `waiting_for_parent` and import them as well.
|
||||
async fn import_block(&mut self, block: Block) {
|
||||
let mut blocks = VecDeque::new();
|
||||
blocks.push_back(block);
|
||||
|
||||
let mut incoming_blocks = Vec::new();
|
||||
|
||||
while let Some(block) = blocks.pop_front() {
|
||||
let block_hash = block.hash();
|
||||
let (header, body) = block.deconstruct();
|
||||
|
||||
incoming_blocks.push(IncomingBlock {
|
||||
hash: block_hash,
|
||||
header: Some(header),
|
||||
body: Some(body),
|
||||
import_existing: false,
|
||||
allow_missing_state: false,
|
||||
justifications: None,
|
||||
origin: None,
|
||||
});
|
||||
|
||||
if let Some(waiting) = self.waiting_for_parent.remove(&block_hash) {
|
||||
blocks.extend(waiting);
|
||||
}
|
||||
}
|
||||
|
||||
self.parachain_import_queue
|
||||
.import_blocks(BlockOrigin::ConsensusBroadcast, incoming_blocks);
|
||||
}
|
||||
|
||||
/// Run the pov-recovery.
|
||||
pub async fn run(mut self) {
|
||||
let mut imported_blocks = self.parachain_client.import_notification_stream().fuse();
|
||||
let mut finalized_blocks = self.parachain_client.finality_notification_stream().fuse();
|
||||
let pending_candidates =
|
||||
pending_candidates(self.relay_chain_client.clone(), self.para_id).fuse();
|
||||
futures::pin_mut!(pending_candidates);
|
||||
|
||||
loop {
|
||||
select! {
|
||||
pending_candidate = pending_candidates.next() => {
|
||||
if let Some((receipt, session_index)) = pending_candidate {
|
||||
self.handle_pending_candidate(receipt, session_index);
|
||||
} else {
|
||||
tracing::debug!(
|
||||
target: LOG_TARGET,
|
||||
"Pending candidates stream ended",
|
||||
);
|
||||
return;
|
||||
}
|
||||
},
|
||||
imported = imported_blocks.next() => {
|
||||
if let Some(imported) = imported {
|
||||
self.handle_block_imported(&imported.hash);
|
||||
} else {
|
||||
tracing::debug!(
|
||||
target: LOG_TARGET,
|
||||
"Imported blocks stream ended",
|
||||
);
|
||||
return;
|
||||
}
|
||||
},
|
||||
finalized = finalized_blocks.next() => {
|
||||
if let Some(finalized) = finalized {
|
||||
self.handle_block_finalized(*finalized.header.number());
|
||||
} else {
|
||||
tracing::debug!(
|
||||
target: LOG_TARGET,
|
||||
"Finalized blocks stream ended",
|
||||
);
|
||||
return;
|
||||
}
|
||||
},
|
||||
next_to_recover = self.next_candidate_to_recover.next() => {
|
||||
if let Some(block_hash) = next_to_recover {
|
||||
self.recover_candidate(block_hash).await;
|
||||
}
|
||||
},
|
||||
(block_hash, available_data) =
|
||||
self.active_candidate_recovery.wait_for_recovery().fuse() =>
|
||||
{
|
||||
self.handle_candidate_recovered(block_hash, available_data).await;
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a stream over pending candidates for the parachain corresponding to `para_id`.
|
||||
fn pending_candidates<RC>(
|
||||
relay_chain_client: Arc<RC>,
|
||||
para_id: ParaId,
|
||||
) -> impl Stream<Item = (CommittedCandidateReceipt, SessionIndex)>
|
||||
where
|
||||
RC: ProvideRuntimeApi<PBlock> + BlockchainEvents<PBlock>,
|
||||
RC::Api: ParachainHost<PBlock>,
|
||||
{
|
||||
relay_chain_client
|
||||
.import_notification_stream()
|
||||
.filter_map(move |n| {
|
||||
let runtime_api = relay_chain_client.runtime_api();
|
||||
let res = runtime_api
|
||||
.candidate_pending_availability(&BlockId::hash(n.hash), para_id)
|
||||
.and_then(|pa| {
|
||||
runtime_api
|
||||
.session_index_for_child(&BlockId::hash(n.hash))
|
||||
.map(|v| pa.map(|pa| (pa, v)))
|
||||
})
|
||||
.map_err(|e| {
|
||||
tracing::error!(
|
||||
target: LOG_TARGET,
|
||||
error = ?e,
|
||||
"Failed fetch pending candidates.",
|
||||
)
|
||||
})
|
||||
.ok()
|
||||
.flatten();
|
||||
|
||||
async move { res }
|
||||
})
|
||||
}
|
||||
@@ -0,0 +1,95 @@
|
||||
// Copyright 2021 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Substrate.
|
||||
|
||||
// Substrate is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Substrate is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use cumulus_primitives_core::ParaId;
|
||||
use cumulus_test_service::{initial_head_data, Keyring::*};
|
||||
use futures::join;
|
||||
use sc_service::TaskExecutor;
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Tests the PoV recovery.
|
||||
///
|
||||
/// If there is a block of the parachain included/backed by the relay chain that isn't circulated in
|
||||
/// the parachain network, we need to recover the PoV from the relay chain. Using this PoV we can
|
||||
/// recover the block, import it and share it with the other nodes of the parachain network.
|
||||
#[substrate_test_utils::test]
|
||||
async fn pov_recovery(task_executor: TaskExecutor) {
|
||||
let mut builder = sc_cli::LoggerBuilder::new("");
|
||||
builder.with_colors(false);
|
||||
let _ = builder.init();
|
||||
|
||||
let para_id = ParaId::from(100);
|
||||
|
||||
// Start alice
|
||||
let alice = cumulus_test_service::run_relay_chain_validator_node(
|
||||
task_executor.clone(),
|
||||
Alice,
|
||||
|| {},
|
||||
vec![],
|
||||
);
|
||||
|
||||
// Start bob
|
||||
let bob = cumulus_test_service::run_relay_chain_validator_node(
|
||||
task_executor.clone(),
|
||||
Bob,
|
||||
|| {},
|
||||
vec![alice.addr.clone()],
|
||||
);
|
||||
|
||||
// Register parachain
|
||||
alice
|
||||
.register_parachain(
|
||||
para_id,
|
||||
cumulus_test_service::runtime::WASM_BINARY
|
||||
.expect("You need to build the WASM binary to run this test!")
|
||||
.to_vec(),
|
||||
initial_head_data(para_id),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Run charlie as parachain collator
|
||||
let charlie =
|
||||
cumulus_test_service::TestNodeBuilder::new(para_id, task_executor.clone(), Charlie)
|
||||
.enable_collator()
|
||||
.connect_to_relay_chain_nodes(vec![&alice, &bob])
|
||||
.wrap_announce_block(|_| {
|
||||
// Never announce any block
|
||||
Arc::new(|_, _| {})
|
||||
})
|
||||
.build()
|
||||
.await;
|
||||
|
||||
// Run dave as parachain full node
|
||||
//
|
||||
// It will need to recover the pov blocks through availability recovery.
|
||||
let dave = cumulus_test_service::TestNodeBuilder::new(para_id, task_executor, Dave)
|
||||
.enable_collator()
|
||||
.use_null_consensus()
|
||||
.connect_to_parachain_node(&charlie)
|
||||
.connect_to_relay_chain_nodes(vec![&alice, &bob])
|
||||
.build()
|
||||
.await;
|
||||
|
||||
dave.wait_for_blocks(7).await;
|
||||
|
||||
join!(
|
||||
alice.task_manager.clean_shutdown(),
|
||||
bob.task_manager.clean_shutdown(),
|
||||
charlie.task_manager.clean_shutdown(),
|
||||
dave.task_manager.clean_shutdown(),
|
||||
);
|
||||
}
|
||||
Reference in New Issue
Block a user