Wait for block import in parachain consensus (#271)

* Wait for block import in parachain consensus

There was a bug in the parachain consensus that when importing a relay
chain block that sets a new best parachain block, but the required
parachain block was not yet imported. This pr fixes this by waiting for
the block to be imported.

* Finish docs
This commit is contained in:
Bastian Köcher
2021-01-05 23:14:27 +01:00
committed by GitHub
parent 518c6fac33
commit 9f0085c097
14 changed files with 812 additions and 316 deletions
+611 -145
View File
@@ -14,7 +14,9 @@
// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.
use sc_client_api::{Backend, BlockBackend, Finalizer, UsageProvider};
use sc_client_api::{
Backend, BlockBackend, BlockImportNotification, BlockchainEvents, Finalizer, UsageProvider,
};
use sp_api::ProvideRuntimeApi;
use sp_blockchain::{Error as ClientError, Result as ClientResult};
use sp_consensus::{
@@ -27,12 +29,11 @@ use sp_runtime::{
};
use polkadot_primitives::v1::{
Block as PBlock, Hash as PHash, Id as ParaId, OccupiedCoreAssumption, ParachainHost,
Block as PBlock, Id as ParaId, OccupiedCoreAssumption, ParachainHost,
};
use codec::Decode;
use futures::{future, Future, FutureExt, Stream, StreamExt};
use log::{error, trace, warn};
use futures::{future, select, FutureExt, Stream, StreamExt};
use std::{marker::PhantomData, sync::Arc};
@@ -47,17 +48,8 @@ pub enum Error {
InvalidHeadData,
}
/// A parachain head update.
pub struct HeadUpdate {
/// The relay-chain's block hash where the parachain head updated.
pub relay_hash: PHash,
/// The parachain head-data.
pub head_data: Vec<u8>,
}
/// Helper for the Polkadot client. This is expected to be a lightweight handle
/// like an `Arc`.
pub trait PolkadotClient: Clone + 'static {
/// Helper for the relay chain client. This is expected to be a lightweight handle like an `Arc`.
pub trait RelaychainClient: Clone + 'static {
/// The error type for interacting with the Polkadot client.
type Error: std::fmt::Debug + Send;
@@ -78,165 +70,319 @@ pub trait PolkadotClient: Clone + 'static {
) -> ClientResult<Option<Vec<u8>>>;
}
/// Finalize the given block in the Parachain.
fn finalize_block<T, Block, B>(client: &T, hash: Block::Hash) -> ClientResult<bool>
/// Follow the finalized head of the given parachain.
///
/// For every finalized block of the relay chain, it will get the included parachain header
/// corresponding to `para_id` and will finalize it in the parachain.
async fn follow_finalized_head<P, Block, B, R>(
para_id: ParaId,
parachain: Arc<P>,
relay_chain: R,
) -> ClientResult<()>
where
Block: BlockT,
T: Finalizer<Block, B> + UsageProvider<Block>,
P: Finalizer<Block, B> + UsageProvider<Block>,
R: RelaychainClient,
B: Backend<Block>,
{
// don't finalize the same block multiple times.
if client.usage_info().chain.finalized_hash != hash {
match client.finalize_block(BlockId::hash(hash), None, true) {
Ok(()) => Ok(true),
Err(e) => match e {
ClientError::UnknownBlock(_) => Ok(false),
_ => Err(e),
},
let mut finalized_heads = relay_chain.finalized_heads(para_id)?;
loop {
let finalized_head = if let Some(h) = finalized_heads.next().await {
h
} else {
tracing::debug!(target: "cumulus-consensus", "Stopping following finalized head.");
return Ok(());
};
let header = match Block::Header::decode(&mut &finalized_head[..]) {
Ok(header) => header,
Err(err) => {
tracing::warn!(
target: "cumulus-consensus",
error = ?err,
"Could not decode parachain header while following finalized heads.",
);
continue;
}
};
let hash = header.hash();
// don't finalize the same block multiple times.
if parachain.usage_info().chain.finalized_hash != hash {
if let Err(e) = parachain.finalize_block(BlockId::hash(hash), None, true) {
match e {
ClientError::UnknownBlock(_) => tracing::debug!(
target: "cumulus-consensus",
block_hash = ?hash,
"Could not finalize block because it is unknown.",
),
_ => tracing::warn!(
target: "cumulus-consensus",
error = ?e,
block_hash = ?hash,
"Failed to finalize block",
),
}
}
}
} else {
Ok(true)
}
}
/// Spawns a future that follows the Polkadot relay chain for the given parachain.
pub fn follow_polkadot<L, P, Block, B>(
/// Run the parachain consensus.
///
/// This will follow the given `relay_chain` to act as consesus for the parachain that corresponds
/// to the given `para_id`. It will set the new best block of the parachain as it gets aware of it.
/// The same happens for the finalized block.
///
/// # Note
///
/// This will access the backend of the parachain and thus, this future should be spawned as blocking
/// task.
pub async fn run_parachain_consensus<P, R, Block, B>(
para_id: ParaId,
local: Arc<L>,
polkadot: P,
parachain: Arc<P>,
relay_chain: R,
announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>,
) -> ClientResult<impl Future<Output = ()> + Send + Unpin>
) -> ClientResult<()>
where
Block: BlockT,
L: Finalizer<Block, B> + UsageProvider<Block> + Send + Sync + BlockBackend<Block>,
for<'a> &'a L: BlockImport<Block>,
P: PolkadotClient,
P: Finalizer<Block, B>
+ UsageProvider<Block>
+ Send
+ Sync
+ BlockBackend<Block>
+ BlockchainEvents<Block>,
for<'a> &'a P: BlockImport<Block>,
R: RelaychainClient,
B: Backend<Block>,
{
let follow_finalized = {
let local = local.clone();
polkadot
.finalized_heads(para_id)?
.filter_map(|head_data| {
let res = match <<Block as BlockT>::Header>::decode(&mut &head_data[..]) {
Ok(header) => Some(header),
Err(err) => {
warn!(
target: "cumulus-consensus",
"Could not decode Parachain header for finalizing: {:?}",
err,
);
None
}
};
future::ready(res)
})
.for_each(move |p_head| {
if let Err(e) = finalize_block(&*local, p_head.hash()) {
warn!(
target: "cumulus-consensus",
"Failed to finalize block: {:?}",
e,
);
}
future::ready(())
})
};
Ok(future::select(
follow_finalized,
follow_new_best(para_id, local, polkadot, announce_block)?,
)
.map(|_| ()))
let follow_new_best = follow_new_best(
para_id,
parachain.clone(),
relay_chain.clone(),
announce_block,
);
let follow_finalized_head = follow_finalized_head(para_id, parachain, relay_chain);
select! {
r = follow_new_best.fuse() => r,
r = follow_finalized_head.fuse() => r,
}
}
/// Follow the relay chain new best head, to update the Parachain new best head.
fn follow_new_best<L, P, Block, B>(
async fn follow_new_best<P, R, Block, B>(
para_id: ParaId,
local: Arc<L>,
polkadot: P,
parachain: Arc<P>,
relay_chain: R,
announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>,
) -> ClientResult<impl Future<Output = ()> + Send + Unpin>
) -> ClientResult<()>
where
Block: BlockT,
L: Finalizer<Block, B> + UsageProvider<Block> + Send + Sync + BlockBackend<Block>,
for<'a> &'a L: BlockImport<Block>,
P: PolkadotClient,
P: Finalizer<Block, B>
+ UsageProvider<Block>
+ Send
+ Sync
+ BlockBackend<Block>
+ BlockchainEvents<Block>,
for<'a> &'a P: BlockImport<Block>,
R: RelaychainClient,
B: Backend<Block>,
{
Ok(polkadot
.new_best_heads(para_id)?
.filter_map(|head_data| {
let res = match <<Block as BlockT>::Header>::decode(&mut &head_data[..]) {
Ok(header) => Some(header),
Err(err) => {
warn!(
target: "cumulus-consensus",
"Could not decode Parachain header: {:?}", err);
None
let mut new_best_heads = relay_chain.new_best_heads(para_id)?.fuse();
let mut imported_blocks = parachain.import_notification_stream().fuse();
// The unset best header of the parachain. Will be `Some(_)` when we have imported a relay chain
// block before the parachain block it included. In this case we need to wait for this block to
// be imported to set it as new best.
let mut unset_best_header = None;
loop {
select! {
h = new_best_heads.next() => {
match h {
Some(h) => handle_new_best_parachain_head(
h,
&*parachain,
&*announce_block,
&mut unset_best_header,
),
None => {
tracing::debug!(
target: "cumulus-consensus",
"Stopping following new best.",
);
return Ok(())
}
}
};
future::ready(res)
})
.for_each(move |h| {
let hash = h.hash();
if local.usage_info().chain.best_hash == hash {
trace!(
target: "cumulus-consensus",
"Skipping set new best block, because block `{}` is already the best.",
hash,
)
} else {
// Make sure the block is already known or otherwise we skip setting new best.
match local.block_status(&BlockId::Hash(hash)) {
Ok(BlockStatus::InChainWithState) => {
// Make it the new best block
let mut block_import_params =
BlockImportParams::new(BlockOrigin::ConsensusBroadcast, h);
block_import_params.fork_choice = Some(ForkChoiceStrategy::Custom(true));
block_import_params.import_existing = true;
if let Err(err) =
(&*local).import_block(block_import_params, Default::default())
{
warn!(
target: "cumulus-consensus",
"Failed to set new best block `{}` with error: {:?}",
hash, err
);
}
(*announce_block)(hash, Vec::new());
}
Ok(BlockStatus::InChainPruned) => {
error!(
target: "cumulus-collator",
"Trying to set pruned block `{:?}` as new best!",
hash,
},
i = imported_blocks.next() => {
match i {
Some(i) => handle_new_block_imported(
i,
&mut unset_best_header,
&*parachain,
&*announce_block,
),
None => {
tracing::debug!(
target: "cumulus-consensus",
"Stopping following imported blocks.",
);
return Ok(())
}
Err(e) => {
error!(
target: "cumulus-collator",
"Failed to get block status of block `{:?}`: {:?}",
hash,
e,
);
}
_ => {}
}
}
future::ready(())
}))
}
}
}
impl<T> PolkadotClient for Arc<T>
/// Handle a new import block of the parachain.
fn handle_new_block_imported<Block, P>(
notification: BlockImportNotification<Block>,
unset_best_header_opt: &mut Option<Block::Header>,
parachain: &P,
announce_block: &dyn Fn(Block::Hash, Vec<u8>),
) where
Block: BlockT,
P: UsageProvider<Block> + Send + Sync + BlockBackend<Block>,
for<'a> &'a P: BlockImport<Block>,
{
let unset_best_header = match (notification.is_new_best, &unset_best_header_opt) {
// If this is the new best block or we don't have any unset block, we can end it here.
(true, _) | (_, None) => return,
(false, Some(ref u)) => u,
};
let unset_hash = if notification.header.number() < unset_best_header.number() {
return;
} else if notification.header.number() == unset_best_header.number() {
let unset_hash = unset_best_header.hash();
if unset_hash != notification.hash {
return;
} else {
unset_hash
}
} else {
unset_best_header.hash()
};
match parachain.block_status(&BlockId::Hash(unset_hash)) {
Ok(BlockStatus::InChainWithState) => {
drop(unset_best_header);
let unset_best_header = unset_best_header_opt
.take()
.expect("We checked above that the value is set; qed");
import_block_as_new_best(unset_hash, unset_best_header, parachain, announce_block);
}
state => tracing::debug!(
target: "cumulus-consensus",
unset_best_header = ?unset_best_header,
imported_header = ?notification.header,
?state,
"Unexpected state for unset best header.",
),
}
}
/// Handle the new best parachain head as extracted from the new best relay chain.
fn handle_new_best_parachain_head<Block, P>(
head: Vec<u8>,
parachain: &P,
announce_block: &dyn Fn(Block::Hash, Vec<u8>),
unset_best_header: &mut Option<Block::Header>,
) where
Block: BlockT,
P: UsageProvider<Block> + Send + Sync + BlockBackend<Block>,
for<'a> &'a P: BlockImport<Block>,
{
let parachain_head = match <<Block as BlockT>::Header>::decode(&mut &head[..]) {
Ok(header) => header,
Err(err) => {
tracing::warn!(
target: "cumulus-consensus",
error = ?err,
"Could not decode Parachain header while following best heads.",
);
return;
}
};
let hash = parachain_head.hash();
if parachain.usage_info().chain.best_hash == hash {
tracing::debug!(
target: "cumulus-consensus",
block_hash = ?hash,
"Skipping set new best block, because block is already the best.",
)
} else {
// Make sure the block is already known or otherwise we skip setting new best.
match parachain.block_status(&BlockId::Hash(hash)) {
Ok(BlockStatus::InChainWithState) => {
unset_best_header.take();
import_block_as_new_best(hash, parachain_head, parachain, announce_block);
}
Ok(BlockStatus::InChainPruned) => {
tracing::error!(
target: "cumulus-collator",
block_hash = ?hash,
"Trying to set pruned block as new best!",
);
}
Ok(BlockStatus::Unknown) => {
*unset_best_header = Some(parachain_head);
tracing::debug!(
target: "cumulus-collator",
block_hash = ?hash,
"Parachain block not yet imported, waiting for import to enact as best block.",
);
}
Err(e) => {
tracing::error!(
target: "cumulus-collator",
block_hash = ?hash,
error = ?e,
"Failed to get block status of block.",
);
}
_ => {}
}
}
}
fn import_block_as_new_best<Block, P>(
hash: Block::Hash,
header: Block::Header,
parachain: &P,
announce_block: &dyn Fn(Block::Hash, Vec<u8>),
) where
Block: BlockT,
P: UsageProvider<Block> + Send + Sync + BlockBackend<Block>,
for<'a> &'a P: BlockImport<Block>,
{
// Make it the new best block
let mut block_import_params = BlockImportParams::new(BlockOrigin::ConsensusBroadcast, header);
block_import_params.fork_choice = Some(ForkChoiceStrategy::Custom(true));
block_import_params.import_existing = true;
if let Err(err) = (&*parachain).import_block(block_import_params, Default::default()) {
tracing::warn!(
target: "cumulus-consensus",
block_hash = ?hash,
error = ?err,
"Failed to set new best block.",
);
} else {
(*announce_block)(hash, Vec::new());
}
}
impl<T> RelaychainClient for Arc<T>
where
T: sc_client_api::BlockchainEvents<PBlock> + ProvideRuntimeApi<PBlock> + 'static + Send + Sync,
<T as ProvideRuntimeApi<PBlock>>::Api: ParachainHost<PBlock, Error = ClientError>,
@@ -329,7 +475,7 @@ impl<Block, PC: Clone, SC: Clone> Clone for SelectChain<Block, PC, SC> {
impl<Block, PC, SC> SelectChainT<Block> for SelectChain<Block, PC, SC>
where
Block: BlockT,
PC: PolkadotClient + Clone + Send + Sync,
PC: RelaychainClient + Clone + Send + Sync,
PC::Error: ToString,
SC: SelectChainT<PBlock>,
{
@@ -364,3 +510,323 @@ where
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use codec::Encode;
use cumulus_test_client::{
runtime::{Block, Header},
Client, InitBlockBuilder, TestClientBuilder, TestClientBuilderExt,
};
use futures::{channel::mpsc, executor::block_on};
use futures_timer::Delay;
use std::{sync::Mutex, time::Duration};
struct RelaychainInner {
new_best_heads: Option<mpsc::UnboundedReceiver<Header>>,
finalized_heads: Option<mpsc::UnboundedReceiver<Header>>,
new_best_heads_sender: mpsc::UnboundedSender<Header>,
finalized_heads_sender: mpsc::UnboundedSender<Header>,
}
impl RelaychainInner {
fn new() -> Self {
let (new_best_heads_sender, new_best_heads) = mpsc::unbounded();
let (finalized_heads_sender, finalized_heads) = mpsc::unbounded();
Self {
new_best_heads_sender,
finalized_heads_sender,
new_best_heads: Some(new_best_heads),
finalized_heads: Some(finalized_heads),
}
}
}
#[derive(Clone)]
struct Relaychain {
inner: Arc<Mutex<RelaychainInner>>,
}
impl Relaychain {
fn new() -> Self {
Self {
inner: Arc::new(Mutex::new(RelaychainInner::new())),
}
}
}
impl RelaychainClient for Relaychain {
type Error = ClientError;
type HeadStream = Box<dyn Stream<Item = Vec<u8>> + Send + Unpin>;
fn new_best_heads(&self, _: ParaId) -> ClientResult<Self::HeadStream> {
let stream = self
.inner
.lock()
.unwrap()
.new_best_heads
.take()
.expect("Should only be called once");
Ok(Box::new(stream.map(|v| v.encode())))
}
fn finalized_heads(&self, _: ParaId) -> ClientResult<Self::HeadStream> {
let stream = self
.inner
.lock()
.unwrap()
.finalized_heads
.take()
.expect("Should only be called once");
Ok(Box::new(stream.map(|v| v.encode())))
}
fn parachain_head_at(
&self,
_: &BlockId<PBlock>,
_: ParaId,
) -> ClientResult<Option<Vec<u8>>> {
unimplemented!("Not required for tests")
}
}
fn build_and_import_block(mut client: Arc<Client>) -> Block {
let builder = client.init_block_builder(None);
let block = builder.build().unwrap().block;
let (header, body) = block.clone().deconstruct();
let mut block_import_params = BlockImportParams::new(BlockOrigin::Own, header);
block_import_params.fork_choice = Some(ForkChoiceStrategy::Custom(false));
block_import_params.body = Some(body);
client
.import_block(block_import_params, Default::default())
.unwrap();
assert_eq!(0, client.chain_info().best_number);
block
}
#[test]
fn follow_new_best_works() {
sp_tracing::try_init_simple();
let client = Arc::new(TestClientBuilder::default().build());
let block = build_and_import_block(client.clone());
let relay_chain = Relaychain::new();
let new_best_heads_sender = relay_chain
.inner
.lock()
.unwrap()
.new_best_heads_sender
.clone();
let consensus =
run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}));
let work = async move {
new_best_heads_sender
.unbounded_send(block.header().clone())
.unwrap();
loop {
Delay::new(Duration::from_millis(100)).await;
if block.hash() == client.usage_info().chain.best_hash {
break;
}
}
};
block_on(async move {
futures::pin_mut!(consensus);
futures::pin_mut!(work);
select! {
r = consensus.fuse() => panic!("Consensus should not end: {:?}", r),
_ = work.fuse() => {},
}
});
}
#[test]
fn follow_finalized_works() {
sp_tracing::try_init_simple();
let client = Arc::new(TestClientBuilder::default().build());
let block = build_and_import_block(client.clone());
let relay_chain = Relaychain::new();
let finalized_sender = relay_chain
.inner
.lock()
.unwrap()
.finalized_heads_sender
.clone();
let consensus =
run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}));
let work = async move {
finalized_sender
.unbounded_send(block.header().clone())
.unwrap();
loop {
Delay::new(Duration::from_millis(100)).await;
if block.hash() == client.usage_info().chain.finalized_hash {
break;
}
}
};
block_on(async move {
futures::pin_mut!(consensus);
futures::pin_mut!(work);
select! {
r = consensus.fuse() => panic!("Consensus should not end: {:?}", r),
_ = work.fuse() => {},
}
});
}
#[test]
fn follow_finalized_does_not_stop_on_unknown_block() {
sp_tracing::try_init_simple();
let client = Arc::new(TestClientBuilder::default().build());
let block = build_and_import_block(client.clone());
let unknown_block = {
let block_builder = client.init_block_builder_at(&BlockId::Hash(block.hash()), None);
block_builder.build().unwrap().block
};
let relay_chain = Relaychain::new();
let finalized_sender = relay_chain
.inner
.lock()
.unwrap()
.finalized_heads_sender
.clone();
let consensus =
run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}));
let work = async move {
for _ in 0..3usize {
finalized_sender
.unbounded_send(unknown_block.header().clone())
.unwrap();
Delay::new(Duration::from_millis(100)).await;
}
finalized_sender
.unbounded_send(block.header().clone())
.unwrap();
loop {
Delay::new(Duration::from_millis(100)).await;
if block.hash() == client.usage_info().chain.finalized_hash {
break;
}
}
};
block_on(async move {
futures::pin_mut!(consensus);
futures::pin_mut!(work);
select! {
r = consensus.fuse() => panic!("Consensus should not end: {:?}", r),
_ = work.fuse() => {},
}
});
}
// It can happen that we first import a relay chain block, while not yet having the parachain
// block imported that would be set to the best block. We need to make sure to import this
// block as new best block in the moment it is imported.
#[test]
fn follow_new_best_sets_best_after_it_is_imported() {
sp_tracing::try_init_simple();
let mut client = Arc::new(TestClientBuilder::default().build());
let block = build_and_import_block(client.clone());
let unknown_block = {
let block_builder = client.init_block_builder_at(&BlockId::Hash(block.hash()), None);
block_builder.build().unwrap().block
};
let relay_chain = Relaychain::new();
let new_best_heads_sender = relay_chain
.inner
.lock()
.unwrap()
.new_best_heads_sender
.clone();
let consensus =
run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}));
let work = async move {
new_best_heads_sender
.unbounded_send(block.header().clone())
.unwrap();
loop {
Delay::new(Duration::from_millis(100)).await;
if block.hash() == client.usage_info().chain.best_hash {
break;
}
}
// Announce the unknown block
new_best_heads_sender
.unbounded_send(unknown_block.header().clone())
.unwrap();
// Do some iterations. As this is a local task executor, only one task can run at a time.
// Meaning that it should already have processed the unknown block.
for _ in 0..3usize {
Delay::new(Duration::from_millis(100)).await;
}
let (header, body) = unknown_block.clone().deconstruct();
let mut block_import_params = BlockImportParams::new(BlockOrigin::Own, header);
block_import_params.fork_choice = Some(ForkChoiceStrategy::Custom(false));
block_import_params.body = Some(body);
// Now import the unkown block to make it "known"
client
.import_block(block_import_params, Default::default())
.unwrap();
loop {
Delay::new(Duration::from_millis(100)).await;
if unknown_block.hash() == client.usage_info().chain.best_hash {
break;
}
}
};
block_on(async move {
futures::pin_mut!(consensus);
futures::pin_mut!(work);
select! {
r = consensus.fuse() => panic!("Consensus should not end: {:?}", r),
_ = work.fuse() => {},
}
});
}
}