Wait for block import in parachain consensus (#271)

* Wait for block import in parachain consensus

There was a bug in the parachain consensus that when importing a relay
chain block that sets a new best parachain block, but the required
parachain block was not yet imported. This pr fixes this by waiting for
the block to be imported.

* Finish docs
This commit is contained in:
Bastian Köcher
2021-01-05 23:14:27 +01:00
committed by GitHub
parent ed8fc4f4a3
commit 9dc7cc5735
14 changed files with 812 additions and 316 deletions
+6 -4
View File
@@ -1034,7 +1034,6 @@ dependencies = [
name = "cumulus-collator" name = "cumulus-collator"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"cumulus-consensus",
"cumulus-network", "cumulus-network",
"cumulus-primitives", "cumulus-primitives",
"cumulus-runtime", "cumulus-runtime",
@@ -1054,7 +1053,6 @@ dependencies = [
"polkadot-service", "polkadot-service",
"polkadot-test-client", "polkadot-test-client",
"polkadot-validation", "polkadot-validation",
"sc-block-builder",
"sc-cli", "sc-cli",
"sc-client-api", "sc-client-api",
"sp-api", "sp-api",
@@ -1073,8 +1071,10 @@ dependencies = [
name = "cumulus-consensus" name = "cumulus-consensus"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"cumulus-test-client",
"cumulus-test-runtime",
"futures 0.3.8", "futures 0.3.8",
"log", "futures-timer 3.0.2",
"parity-scale-codec", "parity-scale-codec",
"polkadot-primitives", "polkadot-primitives",
"polkadot-runtime", "polkadot-runtime",
@@ -1086,8 +1086,10 @@ dependencies = [
"sp-core", "sp-core",
"sp-inherents", "sp-inherents",
"sp-runtime", "sp-runtime",
"sp-tracing",
"substrate-prometheus-endpoint", "substrate-prometheus-endpoint",
"tokio 0.1.22", "tokio 0.1.22",
"tracing",
] ]
[[package]] [[package]]
@@ -1184,7 +1186,6 @@ dependencies = [
"memory-db", "memory-db",
"parity-scale-codec", "parity-scale-codec",
"polkadot-parachain", "polkadot-parachain",
"sc-block-builder",
"sc-client-api", "sc-client-api",
"sc-executor", "sc-executor",
"sp-blockchain", "sp-blockchain",
@@ -1220,6 +1221,7 @@ dependencies = [
"sp-core", "sp-core",
"sp-inherents", "sp-inherents",
"sp-runtime", "sp-runtime",
"tracing",
] ]
[[package]] [[package]]
-3
View File
@@ -6,7 +6,6 @@ edition = "2018"
[dependencies] [dependencies]
# Substrate dependencies # Substrate dependencies
sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-io = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-io = { git = "https://github.com/paritytech/substrate", branch = "master" }
@@ -16,7 +15,6 @@ sp-state-machine = { git = "https://github.com/paritytech/substrate", branch = "
sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-cli = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-cli = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-block-builder = { git = "https://github.com/paritytech/substrate", branch = "master" }
# Polkadot dependencies # Polkadot dependencies
polkadot-service = { git = "https://github.com/paritytech/polkadot", features = [ "real-overseer" ] , branch = "master" } polkadot-service = { git = "https://github.com/paritytech/polkadot", features = [ "real-overseer" ] , branch = "master" }
@@ -28,7 +26,6 @@ polkadot-overseer = { git = "https://github.com/paritytech/polkadot", branch = "
polkadot-node-subsystem = { git = "https://github.com/paritytech/polkadot", branch = "master" } polkadot-node-subsystem = { git = "https://github.com/paritytech/polkadot", branch = "master" }
# Cumulus dependencies # Cumulus dependencies
cumulus-consensus = { path = "../consensus" }
cumulus-network = { path = "../network" } cumulus-network = { path = "../network" }
cumulus-primitives = { path = "../primitives" } cumulus-primitives = { path = "../primitives" }
cumulus-runtime = { path = "../runtime" } cumulus-runtime = { path = "../runtime" }
+8 -41
View File
@@ -24,8 +24,7 @@ use cumulus_primitives::{
}; };
use cumulus_runtime::ParachainBlockData; use cumulus_runtime::ParachainBlockData;
use sc_client_api::{BlockBackend, Finalizer, StateBackend, UsageProvider}; use sc_client_api::{BlockBackend, StateBackend};
use sp_blockchain::HeaderBackend;
use sp_consensus::{ use sp_consensus::{
BlockImport, BlockImportParams, BlockOrigin, BlockStatus, Environment, Error as ConsensusError, BlockImport, BlockImportParams, BlockOrigin, BlockStatus, Environment, Error as ConsensusError,
ForkChoiceStrategy, Proposal, Proposer, RecordProof, ForkChoiceStrategy, Proposal, Proposer, RecordProof,
@@ -522,13 +521,12 @@ where
} }
/// Parameters for [`start_collator`]. /// Parameters for [`start_collator`].
pub struct StartCollatorParams<Block: BlockT, PF, BI, Backend, Client, BS, Spawner, PClient, PBackend> { pub struct StartCollatorParams<Block: BlockT, PF, BI, Backend, BS, Spawner, PClient, PBackend> {
pub proposer_factory: PF, pub proposer_factory: PF,
pub inherent_data_providers: InherentDataProviders, pub inherent_data_providers: InherentDataProviders,
pub backend: Arc<Backend>, pub backend: Arc<Backend>,
pub block_import: BI, pub block_import: BI,
pub block_status: Arc<BS>, pub block_status: Arc<BS>,
pub client: Arc<Client>,
pub announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>, pub announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>,
pub overseer_handler: OverseerHandler, pub overseer_handler: OverseerHandler,
pub spawner: Spawner, pub spawner: Spawner,
@@ -543,7 +541,6 @@ pub async fn start_collator<
PF, PF,
BI, BI,
Backend, Backend,
Client,
BS, BS,
Spawner, Spawner,
PClient, PClient,
@@ -557,7 +554,6 @@ pub async fn start_collator<
backend, backend,
block_import, block_import,
block_status, block_status,
client,
announce_block, announce_block,
mut overseer_handler, mut overseer_handler,
spawner, spawner,
@@ -565,7 +561,7 @@ pub async fn start_collator<
key, key,
polkadot_client, polkadot_client,
polkadot_backend, polkadot_backend,
}: StartCollatorParams<Block, PF, BI, Backend, Client, BS, Spawner, PClient, PBackend2>, }: StartCollatorParams<Block, PF, BI, Backend, BS, Spawner, PClient, PBackend2>,
) -> Result<(), String> ) -> Result<(), String>
where where
PF: Environment<Block> + Send + 'static, PF: Environment<Block> + Send + 'static,
@@ -574,14 +570,6 @@ where
+ Sync + Sync
+ 'static, + 'static,
Backend: sc_client_api::Backend<Block> + 'static, Backend: sc_client_api::Backend<Block> + 'static,
Client: Finalizer<Block, Backend>
+ UsageProvider<Block>
+ HeaderBackend<Block>
+ Send
+ Sync
+ BlockBackend<Block>
+ 'static,
for<'a> &'a Client: BlockImport<Block>,
BS: BlockBackend<Block> + Send + Sync + 'static, BS: BlockBackend<Block> + Send + Sync + 'static,
Spawner: SpawnNamed + Clone + Send + Sync + 'static, Spawner: SpawnNamed + Clone + Send + Sync + 'static,
PBackend: sc_client_api::Backend<PBlock> + 'static, PBackend: sc_client_api::Backend<PBlock> + 'static,
@@ -591,18 +579,6 @@ where
PBackend2: sc_client_api::Backend<PBlock> + 'static, PBackend2: sc_client_api::Backend<PBlock> + 'static,
PBackend2::State: StateBackend<BlakeTwo256>, PBackend2::State: StateBackend<BlakeTwo256>,
{ {
let follow = match cumulus_consensus::follow_polkadot(
para_id,
client,
polkadot_client.clone(),
announce_block.clone(),
) {
Ok(follow) => follow,
Err(e) => return Err(format!("Could not start following polkadot: {:?}", e)),
};
spawner.spawn("cumulus-follow-polkadot", follow.map(|_| ()).boxed());
let collator = Collator::new( let collator = Collator::new(
para_id, para_id,
proposer_factory, proposer_factory,
@@ -644,13 +620,12 @@ mod tests {
use super::*; use super::*;
use std::{pin::Pin, time::Duration}; use std::{pin::Pin, time::Duration};
use sc_block_builder::BlockBuilderProvider;
use sp_core::{testing::TaskExecutor, Pair}; use sp_core::{testing::TaskExecutor, Pair};
use sp_inherents::InherentData; use sp_inherents::InherentData;
use sp_runtime::traits::DigestFor; use sp_runtime::traits::DigestFor;
use cumulus_test_client::{ use cumulus_test_client::{
generate_block_inherents, Client, DefaultTestClientBuilderExt, TestClientBuilder, Client, DefaultTestClientBuilderExt, InitBlockBuilder, TestClientBuilder,
TestClientBuilderExt, TestClientBuilderExt,
}; };
use cumulus_test_runtime::{Block, Header}; use cumulus_test_runtime::{Block, Header};
@@ -700,19 +675,12 @@ mod tests {
fn propose( fn propose(
self, self,
_: InherentData, _: InherentData,
digest: DigestFor<Block>, _: DigestFor<Block>,
_: Duration, _: Duration,
record_proof: RecordProof, _: RecordProof,
) -> Self::Proposal { ) -> Self::Proposal {
let block_id = BlockId::Hash(self.header.hash()); let block_id = BlockId::Hash(self.header.hash());
let mut builder = self let builder = self.client.init_block_builder_at(&block_id, None);
.client
.new_block_at(&block_id, digest, record_proof.yes())
.expect("Initializes new block");
generate_block_inherents(&*self.client, None)
.into_iter()
.for_each(|e| builder.push(e).expect("Pushes an inherent"));
let (block, storage_changes, proof) = let (block, storage_changes, proof) =
builder.build().expect("Creates block").into_inner(); builder.build().expect("Creates block").into_inner();
@@ -766,14 +734,13 @@ mod tests {
}; };
let collator_start = let collator_start =
start_collator::<_, _, _, _, _, _, _, _, polkadot_service::FullBackend, _, _>( start_collator::<_, _, _, _, _, _, _, polkadot_service::FullBackend, _, _>(
StartCollatorParams { StartCollatorParams {
proposer_factory: DummyFactory(client.clone()), proposer_factory: DummyFactory(client.clone()),
inherent_data_providers: Default::default(), inherent_data_providers: Default::default(),
backend, backend,
block_import: client.clone(), block_import: client.clone(),
block_status: client.clone(), block_status: client.clone(),
client: client.clone(),
announce_block: Arc::new(announce_block), announce_block: Arc::new(announce_block),
overseer_handler: handler, overseer_handler: handler,
spawner, spawner,
+16 -5
View File
@@ -6,7 +6,7 @@ authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018" edition = "2018"
[dependencies] [dependencies]
# substrate deps # Substrate deps
sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
@@ -17,12 +17,23 @@ sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "mas
sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
substrate-prometheus-endpoint = { git = "https://github.com/paritytech/substrate", branch = "master" } substrate-prometheus-endpoint = { git = "https://github.com/paritytech/substrate", branch = "master" }
# polkadot deps # Polkadot deps
polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "master" } polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "master" }
polkadot-runtime = { git = "https://github.com/paritytech/polkadot", branch = "master" } polkadot-runtime = { git = "https://github.com/paritytech/polkadot", branch = "master" }
# other deps # Other deps
futures = { version = "0.3.1", features = ["compat"] } futures = { version = "0.3.8", features = ["compat"] }
tokio = "0.1.22" tokio = "0.1.22"
codec = { package = "parity-scale-codec", version = "1.3.0", features = [ "derive" ] } codec = { package = "parity-scale-codec", version = "1.3.0", features = [ "derive" ] }
log = "0.4" tracing = "0.1.22"
[dev-dependencies]
# Substrate deps
sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" }
# Cumulus dependencies
cumulus-test-runtime = { path = "../test/runtime" }
cumulus-test-client = { path = "../test/client" }
# Other deps
futures-timer = "3.0.2"
+591 -125
View File
@@ -14,7 +14,9 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>. // along with Cumulus. If not, see <http://www.gnu.org/licenses/>.
use sc_client_api::{Backend, BlockBackend, Finalizer, UsageProvider}; use sc_client_api::{
Backend, BlockBackend, BlockImportNotification, BlockchainEvents, Finalizer, UsageProvider,
};
use sp_api::ProvideRuntimeApi; use sp_api::ProvideRuntimeApi;
use sp_blockchain::{Error as ClientError, Result as ClientResult}; use sp_blockchain::{Error as ClientError, Result as ClientResult};
use sp_consensus::{ use sp_consensus::{
@@ -27,12 +29,11 @@ use sp_runtime::{
}; };
use polkadot_primitives::v1::{ use polkadot_primitives::v1::{
Block as PBlock, Hash as PHash, Id as ParaId, OccupiedCoreAssumption, ParachainHost, Block as PBlock, Id as ParaId, OccupiedCoreAssumption, ParachainHost,
}; };
use codec::Decode; use codec::Decode;
use futures::{future, Future, FutureExt, Stream, StreamExt}; use futures::{future, select, FutureExt, Stream, StreamExt};
use log::{error, trace, warn};
use std::{marker::PhantomData, sync::Arc}; use std::{marker::PhantomData, sync::Arc};
@@ -47,17 +48,8 @@ pub enum Error {
InvalidHeadData, InvalidHeadData,
} }
/// A parachain head update. /// Helper for the relay chain client. This is expected to be a lightweight handle like an `Arc`.
pub struct HeadUpdate { pub trait RelaychainClient: Clone + 'static {
/// The relay-chain's block hash where the parachain head updated.
pub relay_hash: PHash,
/// The parachain head-data.
pub head_data: Vec<u8>,
}
/// Helper for the Polkadot client. This is expected to be a lightweight handle
/// like an `Arc`.
pub trait PolkadotClient: Clone + 'static {
/// The error type for interacting with the Polkadot client. /// The error type for interacting with the Polkadot client.
type Error: std::fmt::Debug + Send; type Error: std::fmt::Debug + Send;
@@ -78,165 +70,319 @@ pub trait PolkadotClient: Clone + 'static {
) -> ClientResult<Option<Vec<u8>>>; ) -> ClientResult<Option<Vec<u8>>>;
} }
/// Finalize the given block in the Parachain. /// Follow the finalized head of the given parachain.
fn finalize_block<T, Block, B>(client: &T, hash: Block::Hash) -> ClientResult<bool> ///
/// For every finalized block of the relay chain, it will get the included parachain header
/// corresponding to `para_id` and will finalize it in the parachain.
async fn follow_finalized_head<P, Block, B, R>(
para_id: ParaId,
parachain: Arc<P>,
relay_chain: R,
) -> ClientResult<()>
where where
Block: BlockT, Block: BlockT,
T: Finalizer<Block, B> + UsageProvider<Block>, P: Finalizer<Block, B> + UsageProvider<Block>,
R: RelaychainClient,
B: Backend<Block>, B: Backend<Block>,
{ {
// don't finalize the same block multiple times. let mut finalized_heads = relay_chain.finalized_heads(para_id)?;
if client.usage_info().chain.finalized_hash != hash {
match client.finalize_block(BlockId::hash(hash), None, true) { loop {
Ok(()) => Ok(true), let finalized_head = if let Some(h) = finalized_heads.next().await {
Err(e) => match e { h
ClientError::UnknownBlock(_) => Ok(false),
_ => Err(e),
},
}
} else { } else {
Ok(true) tracing::debug!(target: "cumulus-consensus", "Stopping following finalized head.");
return Ok(());
};
let header = match Block::Header::decode(&mut &finalized_head[..]) {
Ok(header) => header,
Err(err) => {
tracing::warn!(
target: "cumulus-consensus",
error = ?err,
"Could not decode parachain header while following finalized heads.",
);
continue;
}
};
let hash = header.hash();
// don't finalize the same block multiple times.
if parachain.usage_info().chain.finalized_hash != hash {
if let Err(e) = parachain.finalize_block(BlockId::hash(hash), None, true) {
match e {
ClientError::UnknownBlock(_) => tracing::debug!(
target: "cumulus-consensus",
block_hash = ?hash,
"Could not finalize block because it is unknown.",
),
_ => tracing::warn!(
target: "cumulus-consensus",
error = ?e,
block_hash = ?hash,
"Failed to finalize block",
),
}
}
}
} }
} }
/// Spawns a future that follows the Polkadot relay chain for the given parachain. /// Run the parachain consensus.
pub fn follow_polkadot<L, P, Block, B>( ///
/// This will follow the given `relay_chain` to act as consesus for the parachain that corresponds
/// to the given `para_id`. It will set the new best block of the parachain as it gets aware of it.
/// The same happens for the finalized block.
///
/// # Note
///
/// This will access the backend of the parachain and thus, this future should be spawned as blocking
/// task.
pub async fn run_parachain_consensus<P, R, Block, B>(
para_id: ParaId, para_id: ParaId,
local: Arc<L>, parachain: Arc<P>,
polkadot: P, relay_chain: R,
announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>, announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>,
) -> ClientResult<impl Future<Output = ()> + Send + Unpin> ) -> ClientResult<()>
where where
Block: BlockT, Block: BlockT,
L: Finalizer<Block, B> + UsageProvider<Block> + Send + Sync + BlockBackend<Block>, P: Finalizer<Block, B>
for<'a> &'a L: BlockImport<Block>, + UsageProvider<Block>
P: PolkadotClient, + Send
+ Sync
+ BlockBackend<Block>
+ BlockchainEvents<Block>,
for<'a> &'a P: BlockImport<Block>,
R: RelaychainClient,
B: Backend<Block>, B: Backend<Block>,
{ {
let follow_finalized = { let follow_new_best = follow_new_best(
let local = local.clone(); para_id,
parachain.clone(),
polkadot relay_chain.clone(),
.finalized_heads(para_id)? announce_block,
.filter_map(|head_data| {
let res = match <<Block as BlockT>::Header>::decode(&mut &head_data[..]) {
Ok(header) => Some(header),
Err(err) => {
warn!(
target: "cumulus-consensus",
"Could not decode Parachain header for finalizing: {:?}",
err,
); );
None let follow_finalized_head = follow_finalized_head(para_id, parachain, relay_chain);
select! {
r = follow_new_best.fuse() => r,
r = follow_finalized_head.fuse() => r,
} }
};
future::ready(res)
})
.for_each(move |p_head| {
if let Err(e) = finalize_block(&*local, p_head.hash()) {
warn!(
target: "cumulus-consensus",
"Failed to finalize block: {:?}",
e,
);
}
future::ready(())
})
};
Ok(future::select(
follow_finalized,
follow_new_best(para_id, local, polkadot, announce_block)?,
)
.map(|_| ()))
} }
/// Follow the relay chain new best head, to update the Parachain new best head. /// Follow the relay chain new best head, to update the Parachain new best head.
fn follow_new_best<L, P, Block, B>( async fn follow_new_best<P, R, Block, B>(
para_id: ParaId, para_id: ParaId,
local: Arc<L>, parachain: Arc<P>,
polkadot: P, relay_chain: R,
announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>, announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>,
) -> ClientResult<impl Future<Output = ()> + Send + Unpin> ) -> ClientResult<()>
where where
Block: BlockT, Block: BlockT,
L: Finalizer<Block, B> + UsageProvider<Block> + Send + Sync + BlockBackend<Block>, P: Finalizer<Block, B>
for<'a> &'a L: BlockImport<Block>, + UsageProvider<Block>
P: PolkadotClient, + Send
+ Sync
+ BlockBackend<Block>
+ BlockchainEvents<Block>,
for<'a> &'a P: BlockImport<Block>,
R: RelaychainClient,
B: Backend<Block>, B: Backend<Block>,
{ {
Ok(polkadot let mut new_best_heads = relay_chain.new_best_heads(para_id)?.fuse();
.new_best_heads(para_id)? let mut imported_blocks = parachain.import_notification_stream().fuse();
.filter_map(|head_data| { // The unset best header of the parachain. Will be `Some(_)` when we have imported a relay chain
let res = match <<Block as BlockT>::Header>::decode(&mut &head_data[..]) { // block before the parachain block it included. In this case we need to wait for this block to
Ok(header) => Some(header), // be imported to set it as new best.
Err(err) => { let mut unset_best_header = None;
warn!(
loop {
select! {
h = new_best_heads.next() => {
match h {
Some(h) => handle_new_best_parachain_head(
h,
&*parachain,
&*announce_block,
&mut unset_best_header,
),
None => {
tracing::debug!(
target: "cumulus-consensus", target: "cumulus-consensus",
"Could not decode Parachain header: {:?}", err); "Stopping following new best.",
None );
return Ok(())
}
}
},
i = imported_blocks.next() => {
match i {
Some(i) => handle_new_block_imported(
i,
&mut unset_best_header,
&*parachain,
&*announce_block,
),
None => {
tracing::debug!(
target: "cumulus-consensus",
"Stopping following imported blocks.",
);
return Ok(())
}
}
}
}
}
}
/// Handle a new import block of the parachain.
fn handle_new_block_imported<Block, P>(
notification: BlockImportNotification<Block>,
unset_best_header_opt: &mut Option<Block::Header>,
parachain: &P,
announce_block: &dyn Fn(Block::Hash, Vec<u8>),
) where
Block: BlockT,
P: UsageProvider<Block> + Send + Sync + BlockBackend<Block>,
for<'a> &'a P: BlockImport<Block>,
{
let unset_best_header = match (notification.is_new_best, &unset_best_header_opt) {
// If this is the new best block or we don't have any unset block, we can end it here.
(true, _) | (_, None) => return,
(false, Some(ref u)) => u,
};
let unset_hash = if notification.header.number() < unset_best_header.number() {
return;
} else if notification.header.number() == unset_best_header.number() {
let unset_hash = unset_best_header.hash();
if unset_hash != notification.hash {
return;
} else {
unset_hash
}
} else {
unset_best_header.hash()
};
match parachain.block_status(&BlockId::Hash(unset_hash)) {
Ok(BlockStatus::InChainWithState) => {
drop(unset_best_header);
let unset_best_header = unset_best_header_opt
.take()
.expect("We checked above that the value is set; qed");
import_block_as_new_best(unset_hash, unset_best_header, parachain, announce_block);
}
state => tracing::debug!(
target: "cumulus-consensus",
unset_best_header = ?unset_best_header,
imported_header = ?notification.header,
?state,
"Unexpected state for unset best header.",
),
}
}
/// Handle the new best parachain head as extracted from the new best relay chain.
fn handle_new_best_parachain_head<Block, P>(
head: Vec<u8>,
parachain: &P,
announce_block: &dyn Fn(Block::Hash, Vec<u8>),
unset_best_header: &mut Option<Block::Header>,
) where
Block: BlockT,
P: UsageProvider<Block> + Send + Sync + BlockBackend<Block>,
for<'a> &'a P: BlockImport<Block>,
{
let parachain_head = match <<Block as BlockT>::Header>::decode(&mut &head[..]) {
Ok(header) => header,
Err(err) => {
tracing::warn!(
target: "cumulus-consensus",
error = ?err,
"Could not decode Parachain header while following best heads.",
);
return;
} }
}; };
future::ready(res) let hash = parachain_head.hash();
})
.for_each(move |h| {
let hash = h.hash();
if local.usage_info().chain.best_hash == hash { if parachain.usage_info().chain.best_hash == hash {
trace!( tracing::debug!(
target: "cumulus-consensus", target: "cumulus-consensus",
"Skipping set new best block, because block `{}` is already the best.", block_hash = ?hash,
hash, "Skipping set new best block, because block is already the best.",
) )
} else { } else {
// Make sure the block is already known or otherwise we skip setting new best. // Make sure the block is already known or otherwise we skip setting new best.
match local.block_status(&BlockId::Hash(hash)) { match parachain.block_status(&BlockId::Hash(hash)) {
Ok(BlockStatus::InChainWithState) => { Ok(BlockStatus::InChainWithState) => {
// Make it the new best block unset_best_header.take();
let mut block_import_params =
BlockImportParams::new(BlockOrigin::ConsensusBroadcast, h);
block_import_params.fork_choice = Some(ForkChoiceStrategy::Custom(true));
block_import_params.import_existing = true;
if let Err(err) = import_block_as_new_best(hash, parachain_head, parachain, announce_block);
(&*local).import_block(block_import_params, Default::default())
{
warn!(
target: "cumulus-consensus",
"Failed to set new best block `{}` with error: {:?}",
hash, err
);
}
(*announce_block)(hash, Vec::new());
} }
Ok(BlockStatus::InChainPruned) => { Ok(BlockStatus::InChainPruned) => {
error!( tracing::error!(
target: "cumulus-collator", target: "cumulus-collator",
"Trying to set pruned block `{:?}` as new best!", block_hash = ?hash,
hash, "Trying to set pruned block as new best!",
);
}
Ok(BlockStatus::Unknown) => {
*unset_best_header = Some(parachain_head);
tracing::debug!(
target: "cumulus-collator",
block_hash = ?hash,
"Parachain block not yet imported, waiting for import to enact as best block.",
); );
} }
Err(e) => { Err(e) => {
error!( tracing::error!(
target: "cumulus-collator", target: "cumulus-collator",
"Failed to get block status of block `{:?}`: {:?}", block_hash = ?hash,
hash, error = ?e,
e, "Failed to get block status of block.",
); );
} }
_ => {} _ => {}
} }
} }
future::ready(())
}))
} }
impl<T> PolkadotClient for Arc<T> fn import_block_as_new_best<Block, P>(
hash: Block::Hash,
header: Block::Header,
parachain: &P,
announce_block: &dyn Fn(Block::Hash, Vec<u8>),
) where
Block: BlockT,
P: UsageProvider<Block> + Send + Sync + BlockBackend<Block>,
for<'a> &'a P: BlockImport<Block>,
{
// Make it the new best block
let mut block_import_params = BlockImportParams::new(BlockOrigin::ConsensusBroadcast, header);
block_import_params.fork_choice = Some(ForkChoiceStrategy::Custom(true));
block_import_params.import_existing = true;
if let Err(err) = (&*parachain).import_block(block_import_params, Default::default()) {
tracing::warn!(
target: "cumulus-consensus",
block_hash = ?hash,
error = ?err,
"Failed to set new best block.",
);
} else {
(*announce_block)(hash, Vec::new());
}
}
impl<T> RelaychainClient for Arc<T>
where where
T: sc_client_api::BlockchainEvents<PBlock> + ProvideRuntimeApi<PBlock> + 'static + Send + Sync, T: sc_client_api::BlockchainEvents<PBlock> + ProvideRuntimeApi<PBlock> + 'static + Send + Sync,
<T as ProvideRuntimeApi<PBlock>>::Api: ParachainHost<PBlock, Error = ClientError>, <T as ProvideRuntimeApi<PBlock>>::Api: ParachainHost<PBlock, Error = ClientError>,
@@ -329,7 +475,7 @@ impl<Block, PC: Clone, SC: Clone> Clone for SelectChain<Block, PC, SC> {
impl<Block, PC, SC> SelectChainT<Block> for SelectChain<Block, PC, SC> impl<Block, PC, SC> SelectChainT<Block> for SelectChain<Block, PC, SC>
where where
Block: BlockT, Block: BlockT,
PC: PolkadotClient + Clone + Send + Sync, PC: RelaychainClient + Clone + Send + Sync,
PC::Error: ToString, PC::Error: ToString,
SC: SelectChainT<PBlock>, SC: SelectChainT<PBlock>,
{ {
@@ -364,3 +510,323 @@ where
} }
} }
} }
#[cfg(test)]
mod tests {
use super::*;
use codec::Encode;
use cumulus_test_client::{
runtime::{Block, Header},
Client, InitBlockBuilder, TestClientBuilder, TestClientBuilderExt,
};
use futures::{channel::mpsc, executor::block_on};
use futures_timer::Delay;
use std::{sync::Mutex, time::Duration};
struct RelaychainInner {
new_best_heads: Option<mpsc::UnboundedReceiver<Header>>,
finalized_heads: Option<mpsc::UnboundedReceiver<Header>>,
new_best_heads_sender: mpsc::UnboundedSender<Header>,
finalized_heads_sender: mpsc::UnboundedSender<Header>,
}
impl RelaychainInner {
fn new() -> Self {
let (new_best_heads_sender, new_best_heads) = mpsc::unbounded();
let (finalized_heads_sender, finalized_heads) = mpsc::unbounded();
Self {
new_best_heads_sender,
finalized_heads_sender,
new_best_heads: Some(new_best_heads),
finalized_heads: Some(finalized_heads),
}
}
}
#[derive(Clone)]
struct Relaychain {
inner: Arc<Mutex<RelaychainInner>>,
}
impl Relaychain {
fn new() -> Self {
Self {
inner: Arc::new(Mutex::new(RelaychainInner::new())),
}
}
}
impl RelaychainClient for Relaychain {
type Error = ClientError;
type HeadStream = Box<dyn Stream<Item = Vec<u8>> + Send + Unpin>;
fn new_best_heads(&self, _: ParaId) -> ClientResult<Self::HeadStream> {
let stream = self
.inner
.lock()
.unwrap()
.new_best_heads
.take()
.expect("Should only be called once");
Ok(Box::new(stream.map(|v| v.encode())))
}
fn finalized_heads(&self, _: ParaId) -> ClientResult<Self::HeadStream> {
let stream = self
.inner
.lock()
.unwrap()
.finalized_heads
.take()
.expect("Should only be called once");
Ok(Box::new(stream.map(|v| v.encode())))
}
fn parachain_head_at(
&self,
_: &BlockId<PBlock>,
_: ParaId,
) -> ClientResult<Option<Vec<u8>>> {
unimplemented!("Not required for tests")
}
}
fn build_and_import_block(mut client: Arc<Client>) -> Block {
let builder = client.init_block_builder(None);
let block = builder.build().unwrap().block;
let (header, body) = block.clone().deconstruct();
let mut block_import_params = BlockImportParams::new(BlockOrigin::Own, header);
block_import_params.fork_choice = Some(ForkChoiceStrategy::Custom(false));
block_import_params.body = Some(body);
client
.import_block(block_import_params, Default::default())
.unwrap();
assert_eq!(0, client.chain_info().best_number);
block
}
#[test]
fn follow_new_best_works() {
sp_tracing::try_init_simple();
let client = Arc::new(TestClientBuilder::default().build());
let block = build_and_import_block(client.clone());
let relay_chain = Relaychain::new();
let new_best_heads_sender = relay_chain
.inner
.lock()
.unwrap()
.new_best_heads_sender
.clone();
let consensus =
run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}));
let work = async move {
new_best_heads_sender
.unbounded_send(block.header().clone())
.unwrap();
loop {
Delay::new(Duration::from_millis(100)).await;
if block.hash() == client.usage_info().chain.best_hash {
break;
}
}
};
block_on(async move {
futures::pin_mut!(consensus);
futures::pin_mut!(work);
select! {
r = consensus.fuse() => panic!("Consensus should not end: {:?}", r),
_ = work.fuse() => {},
}
});
}
#[test]
fn follow_finalized_works() {
sp_tracing::try_init_simple();
let client = Arc::new(TestClientBuilder::default().build());
let block = build_and_import_block(client.clone());
let relay_chain = Relaychain::new();
let finalized_sender = relay_chain
.inner
.lock()
.unwrap()
.finalized_heads_sender
.clone();
let consensus =
run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}));
let work = async move {
finalized_sender
.unbounded_send(block.header().clone())
.unwrap();
loop {
Delay::new(Duration::from_millis(100)).await;
if block.hash() == client.usage_info().chain.finalized_hash {
break;
}
}
};
block_on(async move {
futures::pin_mut!(consensus);
futures::pin_mut!(work);
select! {
r = consensus.fuse() => panic!("Consensus should not end: {:?}", r),
_ = work.fuse() => {},
}
});
}
#[test]
fn follow_finalized_does_not_stop_on_unknown_block() {
sp_tracing::try_init_simple();
let client = Arc::new(TestClientBuilder::default().build());
let block = build_and_import_block(client.clone());
let unknown_block = {
let block_builder = client.init_block_builder_at(&BlockId::Hash(block.hash()), None);
block_builder.build().unwrap().block
};
let relay_chain = Relaychain::new();
let finalized_sender = relay_chain
.inner
.lock()
.unwrap()
.finalized_heads_sender
.clone();
let consensus =
run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}));
let work = async move {
for _ in 0..3usize {
finalized_sender
.unbounded_send(unknown_block.header().clone())
.unwrap();
Delay::new(Duration::from_millis(100)).await;
}
finalized_sender
.unbounded_send(block.header().clone())
.unwrap();
loop {
Delay::new(Duration::from_millis(100)).await;
if block.hash() == client.usage_info().chain.finalized_hash {
break;
}
}
};
block_on(async move {
futures::pin_mut!(consensus);
futures::pin_mut!(work);
select! {
r = consensus.fuse() => panic!("Consensus should not end: {:?}", r),
_ = work.fuse() => {},
}
});
}
// It can happen that we first import a relay chain block, while not yet having the parachain
// block imported that would be set to the best block. We need to make sure to import this
// block as new best block in the moment it is imported.
#[test]
fn follow_new_best_sets_best_after_it_is_imported() {
sp_tracing::try_init_simple();
let mut client = Arc::new(TestClientBuilder::default().build());
let block = build_and_import_block(client.clone());
let unknown_block = {
let block_builder = client.init_block_builder_at(&BlockId::Hash(block.hash()), None);
block_builder.build().unwrap().block
};
let relay_chain = Relaychain::new();
let new_best_heads_sender = relay_chain
.inner
.lock()
.unwrap()
.new_best_heads_sender
.clone();
let consensus =
run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}));
let work = async move {
new_best_heads_sender
.unbounded_send(block.header().clone())
.unwrap();
loop {
Delay::new(Duration::from_millis(100)).await;
if block.hash() == client.usage_info().chain.best_hash {
break;
}
}
// Announce the unknown block
new_best_heads_sender
.unbounded_send(unknown_block.header().clone())
.unwrap();
// Do some iterations. As this is a local task executor, only one task can run at a time.
// Meaning that it should already have processed the unknown block.
for _ in 0..3usize {
Delay::new(Duration::from_millis(100)).await;
}
let (header, body) = unknown_block.clone().deconstruct();
let mut block_import_params = BlockImportParams::new(BlockOrigin::Own, header);
block_import_params.fork_choice = Some(ForkChoiceStrategy::Custom(false));
block_import_params.body = Some(body);
// Now import the unkown block to make it "known"
client
.import_block(block_import_params, Default::default())
.unwrap();
loop {
Delay::new(Duration::from_millis(100)).await;
if unknown_block.hash() == client.usage_info().chain.best_hash {
break;
}
}
};
block_on(async move {
futures::pin_mut!(consensus);
futures::pin_mut!(work);
select! {
r = consensus.fuse() => panic!("Consensus should not end: {:?}", r),
_ = work.fuse() => {},
}
});
}
}
+2 -2
View File
@@ -33,8 +33,8 @@ use sp_std::{cmp, prelude::*};
use cumulus_primitives::{ use cumulus_primitives::{
inherents::{MessageIngestionType, MESSAGE_INGESTION_IDENTIFIER}, inherents::{MessageIngestionType, MESSAGE_INGESTION_IDENTIFIER},
well_known_keys, DownwardMessageHandler, HrmpMessageHandler, OutboundHrmpMessage, ParaId, well_known_keys, DownwardMessageHandler, HrmpMessageHandler, HrmpMessageSender,
UpwardMessage, UpwardMessageSender, HrmpMessageSender, OutboundHrmpMessage, ParaId, UpwardMessage, UpwardMessageSender,
}; };
// TODO: these should be not a constant, but sourced from the relay-chain configuration. // TODO: these should be not a constant, but sourced from the relay-chain configuration.
+5 -2
View File
@@ -498,10 +498,13 @@ mod tests {
let inherent_data = { let inherent_data = {
let mut inherent_data = InherentData::default(); let mut inherent_data = InherentData::default();
inherent_data inherent_data
.put_data(INHERENT_IDENTIFIER, &ValidationDataType { .put_data(
INHERENT_IDENTIFIER,
&ValidationDataType {
validation_data: vfp.clone(), validation_data: vfp.clone(),
relay_chain_state: sp_state_machine::StorageProof::empty(), relay_chain_state: sp_state_machine::StorageProof::empty(),
}) },
)
.expect("failed to put VFP inherent"); .expect("failed to put VFP inherent");
inherent_data inherent_data
}; };
+8 -9
View File
@@ -55,15 +55,15 @@ pub use sp_runtime::{Perbill, Permill};
// XCM imports // XCM imports
use polkadot_parachain::primitives::Sibling; use polkadot_parachain::primitives::Sibling;
use xcm::v0::{MultiLocation, NetworkId, Junction}; use xcm::v0::{Junction, MultiLocation, NetworkId};
use xcm_builder::{ use xcm_builder::{
ParentIsDefault, SiblingParachainConvertsVia, AccountId32Aliases, LocationInverter, AccountId32Aliases, CurrencyAdapter, LocationInverter, ParentIsDefault, RelayChainAsNative,
SovereignSignedViaLocation, SiblingParachainAsNative, SiblingParachainAsNative, SiblingParachainConvertsVia, SignedAccountId32AsNative,
RelayChainAsNative, SignedAccountId32AsNative, CurrencyAdapter SovereignSignedViaLocation,
}; };
use xcm_executor::{ use xcm_executor::{
XcmExecutor, Config, traits::{IsConcrete, NativeAsset},
traits::{NativeAsset, IsConcrete}, Config, XcmExecutor,
}; };
pub type SessionHandlers = (); pub type SessionHandlers = ();
@@ -256,8 +256,7 @@ type LocationConverter = (
AccountId32Aliases<RococoNetwork, AccountId>, AccountId32Aliases<RococoNetwork, AccountId>,
); );
type LocalAssetTransactor = type LocalAssetTransactor = CurrencyAdapter<
CurrencyAdapter<
// Use this currency: // Use this currency:
Balances, Balances,
// Use this currency when it is a fungible asset matching the given location or name: // Use this currency when it is a fungible asset matching the given location or name:
@@ -266,7 +265,7 @@ type LocalAssetTransactor =
LocationConverter, LocationConverter,
// Our chain's account ID type (we can't get away without mentioning it explicitly): // Our chain's account ID type (we can't get away without mentioning it explicitly):
AccountId, AccountId,
>; >;
type LocalOriginConverter = ( type LocalOriginConverter = (
SovereignSignedViaLocation<LocationConverter, Origin>, SovereignSignedViaLocation<LocationConverter, Origin>,
-1
View File
@@ -28,7 +28,6 @@ sp-externalities = { git = "https://github.com/paritytech/substrate", default-fe
parachain = { package = "polkadot-parachain", git = "https://github.com/paritytech/polkadot", default-features = false, features = [ "wasm-api" ] , branch = "master" } parachain = { package = "polkadot-parachain", git = "https://github.com/paritytech/polkadot", default-features = false, features = [ "wasm-api" ] , branch = "master" }
[dev-dependencies] [dev-dependencies]
sc-block-builder = { git = "https://github.com/paritytech/substrate", default-features = false, branch = "master" }
sc-client-api = { git = "https://github.com/paritytech/substrate", default-features = false, branch = "master" } sc-client-api = { git = "https://github.com/paritytech/substrate", default-features = false, branch = "master" }
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "master" }
+6 -14
View File
@@ -18,13 +18,11 @@ use crate::ParachainBlockData;
use cumulus_primitives::{PersistedValidationData, ValidationData}; use cumulus_primitives::{PersistedValidationData, ValidationData};
use cumulus_test_client::{ use cumulus_test_client::{
generate_block_inherents,
runtime::{Block, Hash, Header, UncheckedExtrinsic, WASM_BINARY}, runtime::{Block, Hash, Header, UncheckedExtrinsic, WASM_BINARY},
transfer, Client, DefaultTestClientBuilderExt, LongestChain, TestClientBuilder, transfer, Client, DefaultTestClientBuilderExt, InitBlockBuilder, LongestChain,
TestClientBuilderExt, TestClientBuilder, TestClientBuilderExt,
}; };
use parachain::primitives::{BlockData, HeadData, ValidationParams, ValidationResult}; use parachain::primitives::{BlockData, HeadData, ValidationParams, ValidationResult};
use sc_block_builder::BlockBuilderProvider;
use sc_executor::{ use sc_executor::{
error::Result, sp_wasm_interface::HostFunctions, WasmExecutionMethod, WasmExecutor, error::Result, sp_wasm_interface::HostFunctions, WasmExecutionMethod, WasmExecutor,
}; };
@@ -89,12 +87,8 @@ fn build_block_with_proof(
parent_head: Header, parent_head: Header,
) -> (Block, sp_trie::StorageProof) { ) -> (Block, sp_trie::StorageProof) {
let block_id = BlockId::Hash(client.info().best_hash); let block_id = BlockId::Hash(client.info().best_hash);
let mut builder = client let mut builder = client.init_block_builder_at(
.new_block_at(&block_id, Default::default(), true) &block_id,
.expect("Initializes new block");
generate_block_inherents(
client,
Some(ValidationData { Some(ValidationData {
persisted: PersistedValidationData { persisted: PersistedValidationData {
block_number: 1, block_number: 1,
@@ -103,13 +97,11 @@ fn build_block_with_proof(
}, },
..Default::default() ..Default::default()
}), }),
) );
.into_iter()
.for_each(|e| builder.push(e).expect("Pushes an inherent"));
extra_extrinsics extra_extrinsics
.into_iter() .into_iter()
.for_each(|e| builder.push(e).expect("Pushes an extrinsic")); .for_each(|e| builder.push(e).unwrap());
let built_block = builder.build().expect("Creates block"); let built_block = builder.build().expect("Creates block");
+1
View File
@@ -28,3 +28,4 @@ polkadot-overseer = { git = "https://github.com/paritytech/polkadot", branch = "
# Other deps # Other deps
futures = "0.3.6" futures = "0.3.6"
tracing = "0.1.22"
+47 -24
View File
@@ -23,7 +23,9 @@ use futures::{Future, FutureExt};
use polkadot_overseer::OverseerHandler; use polkadot_overseer::OverseerHandler;
use polkadot_primitives::v1::{Block as PBlock, CollatorId, CollatorPair}; use polkadot_primitives::v1::{Block as PBlock, CollatorId, CollatorPair};
use polkadot_service::{AbstractClient, Client as PClient, ClientHandle, RuntimeApiCollection}; use polkadot_service::{AbstractClient, Client as PClient, ClientHandle, RuntimeApiCollection};
use sc_client_api::{Backend as BackendT, BlockBackend, Finalizer, UsageProvider, StateBackend}; use sc_client_api::{
Backend as BackendT, BlockBackend, BlockchainEvents, Finalizer, StateBackend, UsageProvider,
};
use sc_service::{error::Result as ServiceResult, Configuration, Role, TaskManager}; use sc_service::{error::Result as ServiceResult, Configuration, Role, TaskManager};
use sp_blockchain::HeaderBackend; use sp_blockchain::HeaderBackend;
use sp_consensus::{BlockImport, Environment, Error as ConsensusError, Proposer}; use sp_consensus::{BlockImport, Environment, Error as ConsensusError, Proposer};
@@ -36,7 +38,18 @@ use std::{marker::PhantomData, sync::Arc};
type PFullNode<C> = polkadot_service::NewFull<C>; type PFullNode<C> = polkadot_service::NewFull<C>;
/// Parameters given to [`start_collator`]. /// Parameters given to [`start_collator`].
pub struct StartCollatorParams<'a, Block: BlockT, PF, BI, BS, Client, Backend, Spawner, PClient, PBackend> { pub struct StartCollatorParams<
'a,
Block: BlockT,
PF,
BI,
BS,
Client,
Backend,
Spawner,
PClient,
PBackend,
> {
pub proposer_factory: PF, pub proposer_factory: PF,
pub inherent_data_providers: InherentDataProviders, pub inherent_data_providers: InherentDataProviders,
pub backend: Arc<Backend>, pub backend: Arc<Backend>,
@@ -91,6 +104,7 @@ where
+ Send + Send
+ Sync + Sync
+ BlockBackend<Block> + BlockBackend<Block>
+ BlockchainEvents<Block>
+ 'static, + 'static,
for<'b> &'b Client: BlockImport<Block>, for<'b> &'b Client: BlockImport<Block>,
Backend: BackendT<Block> + 'static, Backend: BackendT<Block> + 'static,
@@ -99,13 +113,20 @@ where
PBackend: BackendT<PBlock> + 'static, PBackend: BackendT<PBlock> + 'static,
PBackend::State: StateBackend<BlakeTwo256>, PBackend::State: StateBackend<BlakeTwo256>,
{ {
polkadot_full_node.client.execute_with(StartConsensus {
para_id,
announce_block: announce_block.clone(),
client: client.clone(),
task_manager,
_phantom: PhantomData,
})?;
polkadot_full_node polkadot_full_node
.client .client
.execute_with(StartCollator { .execute_with(StartCollator {
proposer_factory, proposer_factory,
inherent_data_providers, inherent_data_providers,
backend, backend,
client,
announce_block, announce_block,
overseer_handler: polkadot_full_node overseer_handler: polkadot_full_node
.overseer_handler .overseer_handler
@@ -124,13 +145,12 @@ where
Ok(()) Ok(())
} }
struct StartCollator<Block: BlockT, Client, Backend, PF, BI, BS, Spawner, PBackend> { struct StartCollator<Block: BlockT, Backend, PF, BI, BS, Spawner, PBackend> {
proposer_factory: PF, proposer_factory: PF,
inherent_data_providers: InherentDataProviders, inherent_data_providers: InherentDataProviders,
backend: Arc<Backend>, backend: Arc<Backend>,
block_import: BI, block_import: BI,
block_status: Arc<BS>, block_status: Arc<BS>,
client: Arc<Client>,
announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>, announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>,
overseer_handler: OverseerHandler, overseer_handler: OverseerHandler,
spawner: Spawner, spawner: Spawner,
@@ -139,8 +159,8 @@ struct StartCollator<Block: BlockT, Client, Backend, PF, BI, BS, Spawner, PBacke
polkadot_backend: Arc<PBackend>, polkadot_backend: Arc<PBackend>,
} }
impl<Block, Client, Backend, PF, BI, BS, Spawner, PBackend2> polkadot_service::ExecuteWithClient impl<Block, Backend, PF, BI, BS, Spawner, PBackend2> polkadot_service::ExecuteWithClient
for StartCollator<Block, Client, Backend, PF, BI, BS, Spawner, PBackend2> for StartCollator<Block, Backend, PF, BI, BS, Spawner, PBackend2>
where where
Block: BlockT, Block: BlockT,
PF: Environment<Block> + Send + 'static, PF: Environment<Block> + Send + 'static,
@@ -152,14 +172,6 @@ where
+ Sync + Sync
+ 'static, + 'static,
BS: BlockBackend<Block> + Send + Sync + 'static, BS: BlockBackend<Block> + Send + Sync + 'static,
Client: Finalizer<Block, Backend>
+ UsageProvider<Block>
+ HeaderBackend<Block>
+ Send
+ Sync
+ BlockBackend<Block>
+ 'static,
for<'b> &'b Client: BlockImport<Block>,
Backend: BackendT<Block> + 'static, Backend: BackendT<Block> + 'static,
Spawner: SpawnNamed + Clone + Send + Sync + 'static, Spawner: SpawnNamed + Clone + Send + Sync + 'static,
PBackend2: sc_client_api::Backend<PBlock> + 'static, PBackend2: sc_client_api::Backend<PBlock> + 'static,
@@ -182,7 +194,6 @@ where
backend: self.backend, backend: self.backend,
block_import: self.block_import, block_import: self.block_import,
block_status: self.block_status, block_status: self.block_status,
client: self.client,
announce_block: self.announce_block, announce_block: self.announce_block,
overseer_handler: self.overseer_handler, overseer_handler: self.overseer_handler,
spawner: self.spawner, spawner: self.spawner,
@@ -227,12 +238,13 @@ where
+ Send + Send
+ Sync + Sync
+ BlockBackend<Block> + BlockBackend<Block>
+ BlockchainEvents<Block>
+ 'static, + 'static,
for<'a> &'a Client: BlockImport<Block>, for<'a> &'a Client: BlockImport<Block>,
Backend: BackendT<Block> + 'static, Backend: BackendT<Block> + 'static,
PClient: ClientHandle, PClient: ClientHandle,
{ {
polkadot_full_node.client.execute_with(StartFullNode { polkadot_full_node.client.execute_with(StartConsensus {
announce_block, announce_block,
para_id, para_id,
client, client,
@@ -245,7 +257,7 @@ where
Ok(()) Ok(())
} }
struct StartFullNode<'a, Block: BlockT, Client, Backend> { struct StartConsensus<'a, Block: BlockT, Client, Backend> {
para_id: ParaId, para_id: ParaId,
announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>, announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>,
client: Arc<Client>, client: Arc<Client>,
@@ -254,7 +266,7 @@ struct StartFullNode<'a, Block: BlockT, Client, Backend> {
} }
impl<'a, Block, Client, Backend> polkadot_service::ExecuteWithClient impl<'a, Block, Client, Backend> polkadot_service::ExecuteWithClient
for StartFullNode<'a, Block, Client, Backend> for StartConsensus<'a, Block, Client, Backend>
where where
Block: BlockT, Block: BlockT,
Client: Finalizer<Block, Backend> Client: Finalizer<Block, Backend>
@@ -262,6 +274,7 @@ where
+ Send + Send
+ Sync + Sync
+ BlockBackend<Block> + BlockBackend<Block>
+ BlockchainEvents<Block>
+ 'static, + 'static,
for<'b> &'b Client: BlockImport<Block>, for<'b> &'b Client: BlockImport<Block>,
Backend: BackendT<Block> + 'static, Backend: BackendT<Block> + 'static,
@@ -276,15 +289,25 @@ where
Api: RuntimeApiCollection<StateBackend = PBackend::State>, Api: RuntimeApiCollection<StateBackend = PBackend::State>,
PClient: AbstractClient<PBlock, PBackend, Api = Api> + 'static, PClient: AbstractClient<PBlock, PBackend, Api = Api> + 'static,
{ {
let future = cumulus_consensus::follow_polkadot( let consensus = cumulus_consensus::run_parachain_consensus(
self.para_id, self.para_id,
self.client, self.client,
client, client,
self.announce_block, self.announce_block,
)?; );
self.task_manager
.spawn_essential_handle() self.task_manager.spawn_essential_handle().spawn(
.spawn("cumulus-consensus", future); "cumulus-consensus",
consensus.then(|r| async move {
if let Err(e) = r {
tracing::error!(
target: "cumulus-service",
error = %e,
"Parachain consensus failed.",
)
}
}),
);
Ok(()) Ok(())
} }
+59 -25
View File
@@ -14,31 +14,63 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>. // along with Cumulus. If not, see <http://www.gnu.org/licenses/>.
use crate::Client; use crate::{Backend, Client};
use cumulus_primitives::{inherents::{VALIDATION_DATA_IDENTIFIER, ValidationDataType}, ValidationData}; use cumulus_primitives::{
use cumulus_test_runtime::GetLastTimestamp; inherents::{ValidationDataType, VALIDATION_DATA_IDENTIFIER},
ValidationData,
};
use cumulus_test_runtime::{Block, GetLastTimestamp};
use polkadot_primitives::v1::BlockNumber as PBlockNumber; use polkadot_primitives::v1::BlockNumber as PBlockNumber;
use sc_block_builder::BlockBuilderApi; use sc_block_builder::{BlockBuilder, BlockBuilderProvider};
use sp_api::ProvideRuntimeApi; use sp_api::ProvideRuntimeApi;
use sp_blockchain::HeaderBackend;
use sp_core::ExecutionContext;
use sp_runtime::generic::BlockId; use sp_runtime::generic::BlockId;
/// Generate the inherents required by the test runtime. /// An extension for the Cumulus test client to init a block builder.
/// pub trait InitBlockBuilder {
/// - `validation_data`: The [`ValidationData`] that will be passed as inherent /// Init a specific block builder that works for the test runtime.
/// data into the runtime when building the inherents. If ///
/// `None` is passed, the default value will be used. /// This will automatically create and push the inherents for you to make the block
pub fn generate_block_inherents( /// valid for the test runtime.
client: &Client, fn init_block_builder(
&self,
validation_data: Option<ValidationData<PBlockNumber>>, validation_data: Option<ValidationData<PBlockNumber>>,
) -> Vec<cumulus_test_runtime::UncheckedExtrinsic> { ) -> sc_block_builder::BlockBuilder<Block, Client, Backend>;
/// Init a specific block builder at a specific block that works for the test runtime.
///
/// Same as [`InitBlockBuilder::init_block_builder`] besides that it takes a
/// [`BlockId`] to say which should be the parent block of the block that is being build.
fn init_block_builder_at(
&self,
at: &BlockId<Block>,
validation_data: Option<ValidationData<PBlockNumber>>,
) -> sc_block_builder::BlockBuilder<Block, Client, Backend>;
}
impl InitBlockBuilder for Client {
fn init_block_builder(
&self,
validation_data: Option<ValidationData<PBlockNumber>>,
) -> BlockBuilder<Block, Client, Backend> {
let chain_info = self.chain_info();
self.init_block_builder_at(&BlockId::Hash(chain_info.best_hash), validation_data)
}
fn init_block_builder_at(
&self,
at: &BlockId<Block>,
validation_data: Option<ValidationData<PBlockNumber>>,
) -> BlockBuilder<Block, Client, Backend> {
let mut block_builder = self
.new_block_at(at, Default::default(), true)
.expect("Creates new block builder for test runtime");
let mut inherent_data = sp_inherents::InherentData::new(); let mut inherent_data = sp_inherents::InherentData::new();
let block_id = BlockId::Hash(client.info().best_hash); let last_timestamp = self
let last_timestamp = client
.runtime_api() .runtime_api()
.get_last_timestamp(&block_id) .get_last_timestamp(&at)
.expect("Get last timestamp"); .expect("Get last timestamp");
let timestamp = last_timestamp + cumulus_test_runtime::MinimumPeriod::get(); let timestamp = last_timestamp + cumulus_test_runtime::MinimumPeriod::get();
inherent_data inherent_data
@@ -54,12 +86,14 @@ pub fn generate_block_inherents(
) )
.expect("Put validation function params failed"); .expect("Put validation function params failed");
client let inherents = block_builder
.runtime_api() .create_inherents(inherent_data)
.inherent_extrinsics_with_context( .expect("Creates inherents");
&BlockId::number(0),
ExecutionContext::BlockConstruction, inherents
inherent_data, .into_iter()
) .for_each(|ext| block_builder.push(ext).expect("Pushes inherent"));
.expect("Get inherents failed")
block_builder
}
} }
+15 -13
View File
@@ -22,20 +22,17 @@
#![cfg_attr(not(feature = "std"), no_std)] #![cfg_attr(not(feature = "std"), no_std)]
use codec::{Encode, Decode}; use codec::{Decode, Encode};
use sp_std::convert::{TryFrom, TryInto};
use frame_support::{
decl_module, decl_event, decl_error,
sp_runtime::traits::Hash,
};
use frame_system::ensure_root;
use cumulus_primitives::{ use cumulus_primitives::{
ParaId, InboundHrmpMessage, InboundDownwardMessage, OutboundHrmpMessage, DownwardMessageHandler, HrmpMessageHandler, HrmpMessageSender, InboundDownwardMessage,
DownwardMessageHandler, HrmpMessageHandler, UpwardMessageSender, HrmpMessageSender, InboundHrmpMessage, OutboundHrmpMessage, ParaId, UpwardMessageSender,
}; };
use frame_support::{decl_error, decl_event, decl_module, sp_runtime::traits::Hash};
use frame_system::ensure_root;
use sp_std::convert::{TryFrom, TryInto};
use xcm::{ use xcm::{
v0::{Error as XcmError, ExecuteXcm, Junction, MultiLocation, SendXcm, Xcm},
VersionedXcm, VersionedXcm,
v0::{Xcm, MultiLocation, Error as XcmError, Junction, SendXcm, ExecuteXcm}
}; };
pub trait Config: frame_system::Config { pub trait Config: frame_system::Config {
@@ -125,7 +122,10 @@ impl<T: Config> HrmpMessageHandler for Module<T> {
frame_support::debug::print!("Processing HRMP XCM: {:?}", &hash); frame_support::debug::print!("Processing HRMP XCM: {:?}", &hash);
match VersionedXcm::decode(&mut &msg.data[..]).map(Xcm::try_from) { match VersionedXcm::decode(&mut &msg.data[..]).map(Xcm::try_from) {
Ok(Ok(xcm)) => { Ok(Ok(xcm)) => {
match T::XcmExecutor::execute_xcm(Junction::Parachain { id: sender.into() }.into(), xcm) { match T::XcmExecutor::execute_xcm(
Junction::Parachain { id: sender.into() }.into(),
xcm,
) {
Ok(..) => RawEvent::Success(hash), Ok(..) => RawEvent::Success(hash),
Err(e) => RawEvent::Fail(hash, e), Err(e) => RawEvent::Fail(hash, e),
}; };
@@ -151,7 +151,8 @@ impl<T: Config> SendXcm for Module<T> {
let data = msg.encode(); let data = msg.encode();
let hash = T::Hashing::hash(&data); let hash = T::Hashing::hash(&data);
T::UpwardMessageSender::send_upward_message(data).map_err(|_| XcmError::Undefined)?; T::UpwardMessageSender::send_upward_message(data)
.map_err(|_| XcmError::Undefined)?;
Self::deposit_event(RawEvent::UpwardMessageSent(hash)); Self::deposit_event(RawEvent::UpwardMessageSent(hash));
Ok(()) Ok(())
@@ -165,7 +166,8 @@ impl<T: Config> SendXcm for Module<T> {
data, data,
}; };
// TODO: Better error here // TODO: Better error here
T::HrmpMessageSender::send_hrmp_message(message).map_err(|_| XcmError::Undefined)?; T::HrmpMessageSender::send_hrmp_message(message)
.map_err(|_| XcmError::Undefined)?;
Self::deposit_event(RawEvent::HrmpMessageSent(hash)); Self::deposit_event(RawEvent::HrmpMessageSent(hash));
Ok(()) Ok(())
} }