The great refactor (#325)

* Move pallets to pallets folder and rename them

* Move genesis file to service

* Rename primitives to primitives-core

* Delete cumulus-runtime

* Move stuff to client folder and rename
This commit is contained in:
Bastian Köcher
2021-02-10 13:07:21 +01:00
committed by GitHub
parent a4998998a9
commit 119e0859b9
48 changed files with 436 additions and 547 deletions
+53
View File
@@ -0,0 +1,53 @@
[package]
name = "cumulus-client-collator"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
[dependencies]
# Substrate dependencies
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-io = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-inherents = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-state-machine = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-cli = { git = "https://github.com/paritytech/substrate", branch = "master" }
# Polkadot dependencies
polkadot-service = { git = "https://github.com/paritytech/polkadot", features = [ "real-overseer" ], branch = "master" }
polkadot-parachain = { git = "https://github.com/paritytech/polkadot", branch = "master" }
polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "master" }
polkadot-node-primitives = { git = "https://github.com/paritytech/polkadot", branch = "master" }
polkadot-overseer = { git = "https://github.com/paritytech/polkadot", branch = "master" }
polkadot-node-subsystem = { git = "https://github.com/paritytech/polkadot", branch = "master" }
# Cumulus dependencies
cumulus-client-network = { path = "../network" }
cumulus-primitives-core = { path = "../../primitives/core" }
# Other dependencies
log = "0.4.8"
codec = { package = "parity-scale-codec", version = "2.0.0", features = [ "derive" ] }
futures = { version = "0.3.1", features = ["compat"] }
parking_lot = "0.9"
[dev-dependencies]
# Cumulus dependencies
cumulus-test-runtime = { path = "../../test/runtime" }
cumulus-test-client = { path = "../../test/client" }
# Substrate dependencies
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-state-machine = { git = "https://github.com/paritytech/substrate", branch = "master" }
substrate-test-client = { git = "https://github.com/paritytech/substrate", branch = "master" }
# Polkadot dependencies
polkadot-test-client = { git = "https://github.com/paritytech/polkadot", branch = "master" }
polkadot-node-subsystem-test-helpers = { git = "https://github.com/paritytech/polkadot", branch = "master" }
# Other dependencies
env_logger = "0.7.1"
+895
View File
@@ -0,0 +1,895 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Cumulus.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.
//! Cumulus Collator implementation for Substrate.
use cumulus_client_network::WaitToAnnounce;
use cumulus_primitives_core::{
inherents, ParachainBlockData,
well_known_keys, InboundDownwardMessage, InboundHrmpMessage, OutboundHrmpMessage,
PersistedValidationData, relay_chain,
};
use sc_client_api::{BlockBackend, StateBackend};
use sp_consensus::{
BlockImport, BlockImportParams, BlockOrigin, BlockStatus, Environment, Error as ConsensusError,
ForkChoiceStrategy, Proposal, Proposer, RecordProof,
};
use sp_core::traits::SpawnNamed;
use sp_inherents::{InherentData, InherentDataProviders};
use sp_runtime::{
generic::BlockId,
traits::{BlakeTwo256, Block as BlockT, Header as HeaderT, Zero},
};
use sp_state_machine::InspectState;
use polkadot_node_primitives::{Collation, CollationGenerationConfig};
use polkadot_node_subsystem::messages::{CollationGenerationMessage, CollatorProtocolMessage};
use polkadot_overseer::OverseerHandler;
use polkadot_primitives::v1::{
Block as PBlock, BlockData, BlockNumber as PBlockNumber, CollatorPair, Hash as PHash, HeadData,
Id as ParaId, PoV, UpwardMessage, HrmpChannelId,
};
use polkadot_service::RuntimeApiCollection;
use codec::{Decode, Encode};
use log::{debug, error, info, trace};
use futures::prelude::*;
use std::{collections::BTreeMap, marker::PhantomData, sync::Arc, time::Duration};
use parking_lot::Mutex;
type TransactionFor<E, Block> =
<<E as Environment<Block>>::Proposer as Proposer<Block>>::Transaction;
/// The logging target.
const LOG_TARGET: &str = "cumulus-collator";
/// The implementation of the Cumulus `Collator`.
pub struct Collator<Block: BlockT, PF, BI, BS, Backend, PBackend, PClient, PBackend2> {
para_id: ParaId,
proposer_factory: Arc<Mutex<PF>>,
_phantom: PhantomData<(Block, PBackend)>,
inherent_data_providers: InherentDataProviders,
block_import: Arc<Mutex<BI>>,
block_status: Arc<BS>,
wait_to_announce: Arc<Mutex<WaitToAnnounce<Block>>>,
backend: Arc<Backend>,
polkadot_client: Arc<PClient>,
polkadot_backend: Arc<PBackend2>,
}
impl<Block: BlockT, PF, BI, BS, Backend, PBackend, PClient, PBackend2> Clone
for Collator<Block, PF, BI, BS, Backend, PBackend, PClient, PBackend2>
{
fn clone(&self) -> Self {
Self {
para_id: self.para_id.clone(),
proposer_factory: self.proposer_factory.clone(),
inherent_data_providers: self.inherent_data_providers.clone(),
_phantom: PhantomData,
block_import: self.block_import.clone(),
block_status: self.block_status.clone(),
wait_to_announce: self.wait_to_announce.clone(),
backend: self.backend.clone(),
polkadot_client: self.polkadot_client.clone(),
polkadot_backend: self.polkadot_backend.clone(),
}
}
}
impl<Block, PF, BI, BS, Backend, PBackend, PApi, PClient, PBackend2>
Collator<Block, PF, BI, BS, Backend, PBackend, PClient, PBackend2>
where
Block: BlockT,
PF: Environment<Block> + 'static + Send,
PF::Proposer: Send,
BI: BlockImport<
Block,
Error = ConsensusError,
Transaction = <PF::Proposer as Proposer<Block>>::Transaction,
> + Send
+ Sync
+ 'static,
BS: BlockBackend<Block>,
Backend: sc_client_api::Backend<Block> + 'static,
PBackend: sc_client_api::Backend<PBlock> + 'static,
PBackend::State: StateBackend<BlakeTwo256>,
PApi: RuntimeApiCollection<StateBackend = PBackend::State>,
PClient: polkadot_service::AbstractClient<PBlock, PBackend, Api = PApi> + 'static,
PBackend2: sc_client_api::Backend<PBlock> + 'static,
PBackend2::State: StateBackend<BlakeTwo256>,
{
/// Create a new instance.
fn new(
para_id: ParaId,
proposer_factory: PF,
inherent_data_providers: InherentDataProviders,
overseer_handler: OverseerHandler,
block_import: BI,
block_status: Arc<BS>,
spawner: Arc<dyn SpawnNamed + Send + Sync>,
announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>,
backend: Arc<Backend>,
polkadot_client: Arc<PClient>,
polkadot_backend: Arc<PBackend2>,
) -> Self {
let wait_to_announce = Arc::new(Mutex::new(WaitToAnnounce::new(
spawner,
announce_block,
overseer_handler,
)));
Self {
para_id,
proposer_factory: Arc::new(Mutex::new(proposer_factory)),
inherent_data_providers,
_phantom: PhantomData,
block_import: Arc::new(Mutex::new(block_import)),
block_status,
wait_to_announce,
backend,
polkadot_client,
polkadot_backend,
}
}
/// Returns the whole contents of the downward message queue for the parachain we are collating
/// for.
///
/// Returns `None` in case of an error.
fn retrieve_dmq_contents(&self, relay_parent: PHash) -> Option<Vec<InboundDownwardMessage>> {
self.polkadot_client
.runtime_api()
.dmq_contents_with_context(
&BlockId::hash(relay_parent),
sp_core::ExecutionContext::Importing,
self.para_id,
)
.map_err(|e| {
error!(
target: LOG_TARGET,
"An error occured during requesting the downward messages for {}: {:?}",
relay_parent, e,
);
})
.ok()
}
/// Returns channels contents for each inbound HRMP channel addressed to the parachain we are
/// collating for.
///
/// Empty channels are also included.
fn retrieve_all_inbound_hrmp_channel_contents(
&self,
relay_parent: PHash,
) -> Option<BTreeMap<ParaId, Vec<InboundHrmpMessage>>> {
self.polkadot_client
.runtime_api()
.inbound_hrmp_channels_contents_with_context(
&BlockId::hash(relay_parent),
sp_core::ExecutionContext::Importing,
self.para_id,
)
.map_err(|e| {
error!(
target: LOG_TARGET,
"An error occured during requesting the inbound HRMP messages for {}: {:?}",
relay_parent, e,
);
})
.ok()
}
/// Collect the relevant relay chain state in form of a proof for putting it into the validation
/// data inherent.
fn collect_relay_storage_proof(
&self,
relay_parent: PHash,
) -> Option<sp_state_machine::StorageProof> {
use relay_chain::well_known_keys as relay_well_known_keys;
let relay_parent_state_backend = self
.polkadot_backend
.state_at(BlockId::Hash(relay_parent))
.map_err(|e| {
error!(
target: LOG_TARGET,
"Cannot obtain the state of the relay chain at `{:?}`: {:?}",
relay_parent,
e,
)
})
.ok()?;
let ingress_channels = relay_parent_state_backend
.storage(&relay_well_known_keys::hrmp_ingress_channel_index(
self.para_id,
))
.map_err(|e| {
error!(
target: LOG_TARGET,
"Cannot obtain the hrmp ingress channel index: {:?}",
e,
)
})
.ok()?;
let ingress_channels = ingress_channels
.map(|raw| <Vec<ParaId>>::decode(&mut &raw[..]))
.transpose()
.map_err(|e| {
error!(
target: LOG_TARGET,
"Cannot decode the hrmp ingress channel index: {:?}",
e,
)
})
.ok()?
.unwrap_or_default();
let egress_channels = relay_parent_state_backend
.storage(&relay_well_known_keys::hrmp_egress_channel_index(
self.para_id,
))
.map_err(|e| {
error!(
target: LOG_TARGET,
"Cannot obtain the hrmp egress channel index: {:?}",
e,
)
})
.ok()?;
let egress_channels = egress_channels
.map(|raw| <Vec<ParaId>>::decode(&mut &raw[..]))
.transpose()
.map_err(|e| {
error!(
target: LOG_TARGET,
"Cannot decode the hrmp egress channel index: {:?}",
e,
)
})
.ok()?
.unwrap_or_default();
let mut relevant_keys = vec![];
relevant_keys.push(relay_well_known_keys::ACTIVE_CONFIG.to_vec());
relevant_keys.push(relay_well_known_keys::dmq_mqc_head(self.para_id));
relevant_keys.push(relay_well_known_keys::relay_dispatch_queue_size(
self.para_id,
));
relevant_keys.push(relay_well_known_keys::hrmp_ingress_channel_index(
self.para_id,
));
relevant_keys.push(relay_well_known_keys::hrmp_egress_channel_index(
self.para_id,
));
relevant_keys.extend(ingress_channels.into_iter().map(|sender| {
relay_well_known_keys::hrmp_channels(HrmpChannelId {
sender,
recipient: self.para_id,
})
}));
relevant_keys.extend(egress_channels.into_iter().map(|recipient| {
relay_well_known_keys::hrmp_channels(HrmpChannelId {
sender: self.para_id,
recipient,
})
}));
sp_state_machine::prove_read(relay_parent_state_backend, relevant_keys)
.map_err(|e| {
error!(
target: LOG_TARGET,
"Failed to collect required relay chain state storage proof at `{:?}`: {:?}",
relay_parent,
e,
)
})
.ok()
}
/// Get the inherent data with validation function parameters injected
fn inherent_data(
&mut self,
validation_data: &PersistedValidationData,
relay_parent: PHash,
) -> Option<InherentData> {
let mut inherent_data = self
.inherent_data_providers
.create_inherent_data()
.map_err(|e| {
error!(
target: LOG_TARGET,
"Failed to create inherent data: {:?}",
e,
)
})
.ok()?;
let system_inherent_data = {
let relay_chain_state = self.collect_relay_storage_proof(relay_parent)?;
let downward_messages = self.retrieve_dmq_contents(relay_parent)?;
let horizontal_messages =
self.retrieve_all_inbound_hrmp_channel_contents(relay_parent)?;
inherents::SystemInherentData {
downward_messages,
horizontal_messages,
validation_data: validation_data.clone(),
relay_chain_state,
}
};
inherent_data
.put_data(
inherents::SYSTEM_INHERENT_IDENTIFIER,
&system_inherent_data,
)
.map_err(|e| {
error!(
target: LOG_TARGET,
"Failed to put the system inherent into inherent data: {:?}",
e,
)
})
.ok()?;
Some(inherent_data)
}
/// Checks the status of the given block hash in the Parachain.
///
/// Returns `true` if the block could be found and is good to be build on.
fn check_block_status(&self, hash: Block::Hash, header: &Block::Header) -> bool {
match self.block_status.block_status(&BlockId::Hash(hash)) {
Ok(BlockStatus::Queued) => {
debug!(
target: LOG_TARGET,
"Skipping candidate production, because block `{:?}` is still queued for import.",
hash,
);
false
}
Ok(BlockStatus::InChainWithState) => true,
Ok(BlockStatus::InChainPruned) => {
error!(
target: LOG_TARGET,
"Skipping candidate production, because block `{:?}` is already pruned!",
hash,
);
false
}
Ok(BlockStatus::KnownBad) => {
error!(
target: LOG_TARGET,
"Block `{}` is tagged as known bad and is included in the relay chain! Skipping candidate production!",
hash,
);
false
}
Ok(BlockStatus::Unknown) => {
if header.number().is_zero() {
error!(
target: LOG_TARGET,
"Could not find the header `{:?}` of the genesis block in the database!",
hash,
);
} else {
debug!(
target: LOG_TARGET,
"Skipping candidate production, because block `{:?}` is unknown.",
hash,
);
}
false
}
Err(e) => {
error!(
target: LOG_TARGET,
"Failed to get block status of `{:?}`: {:?}",
hash,
e,
);
false
}
}
}
fn build_collation(
&mut self,
block: ParachainBlockData<Block>,
block_hash: Block::Hash,
relay_block_number: PBlockNumber,
) -> Option<Collation> {
let block_data = BlockData(block.encode());
let header = block.into_header();
let head_data = HeadData(header.encode());
let state = match self.backend.state_at(BlockId::Hash(block_hash)) {
Ok(state) => state,
Err(e) => {
error!(target: LOG_TARGET, "Failed to get state of the freshly built block: {:?}", e);
return None;
}
};
state.inspect_state(|| {
let upward_messages = sp_io::storage::get(well_known_keys::UPWARD_MESSAGES);
let upward_messages = match upward_messages.map(|v| Vec::<UpwardMessage>::decode(&mut &v[..])) {
Some(Ok(msgs)) => msgs,
Some(Err(e)) => {
error!(target: LOG_TARGET, "Failed to decode upward messages from the build block: {:?}", e);
return None
},
None => Vec::new(),
};
let new_validation_code = sp_io::storage::get(well_known_keys::NEW_VALIDATION_CODE);
let processed_downward_messages = sp_io::storage::get(well_known_keys::PROCESSED_DOWNWARD_MESSAGES);
let processed_downward_messages = match processed_downward_messages
.map(|v| u32::decode(&mut &v[..]))
{
Some(Ok(processed_cnt)) => processed_cnt,
Some(Err(e)) => {
error!(
target: LOG_TARGET,
"Failed to decode the count of processed downward messages: {:?}",
e
);
return None
}
None => 0,
};
let horizontal_messages = sp_io::storage::get(well_known_keys::HRMP_OUTBOUND_MESSAGES);
let horizontal_messages = match horizontal_messages
.map(|v| Vec::<OutboundHrmpMessage>::decode(&mut &v[..]))
{
Some(Ok(horizontal_messages)) => horizontal_messages,
Some(Err(e)) => {
error!(
target: LOG_TARGET,
"Failed to decode the horizontal messages: {:?}",
e
);
return None
}
None => Vec::new(),
};
let hrmp_watermark = sp_io::storage::get(well_known_keys::HRMP_WATERMARK);
let hrmp_watermark = match hrmp_watermark.map(|v| PBlockNumber::decode(&mut &v[..])) {
Some(Ok(hrmp_watermark)) => hrmp_watermark,
Some(Err(e)) => {
error!(
target: LOG_TARGET,
"Failed to decode the HRMP watermark: {:?}",
e
);
return None
}
None => {
// If the runtime didn't set `HRMP_WATERMARK`, then it means no messages were
// supplied via the message ingestion inherent. Assuming that the PVF/runtime
// checks that legitly there are no pending messages we can therefore move the
// watermark up to the relay-block number.
relay_block_number
}
};
Some(Collation {
upward_messages,
new_validation_code: new_validation_code.map(Into::into),
head_data,
proof_of_validity: PoV { block_data },
processed_downward_messages,
horizontal_messages,
hrmp_watermark,
})
})
}
async fn produce_candidate(
mut self,
relay_parent: PHash,
validation_data: PersistedValidationData,
) -> Option<Collation> {
trace!(target: LOG_TARGET, "Producing candidate");
let last_head =
match Block::Header::decode(&mut &validation_data.parent_head.0[..]) {
Ok(x) => x,
Err(e) => {
error!(target: LOG_TARGET, "Could not decode the head data: {:?}", e);
return None;
}
};
let last_head_hash = last_head.hash();
if !self.check_block_status(last_head_hash, &last_head) {
return None;
}
info!(
target: LOG_TARGET,
"Starting collation for relay parent {:?} on parent {:?}.",
relay_parent,
last_head_hash,
);
let proposer_future = self.proposer_factory.lock().init(&last_head);
let proposer = proposer_future
.await
.map_err(|e| {
error!(
target: LOG_TARGET,
"Could not create proposer: {:?}",
e,
)
})
.ok()?;
let inherent_data = self.inherent_data(&validation_data, relay_parent)?;
let Proposal {
block,
storage_changes,
proof,
} = proposer
.propose(
inherent_data,
Default::default(),
//TODO: Fix this.
Duration::from_millis(500),
RecordProof::Yes,
)
.await
.map_err(|e| {
error!(
target: LOG_TARGET,
"Proposing failed: {:?}",
e,
)
})
.ok()?;
let proof = match proof {
Some(proof) => proof,
None => {
error!(
target: LOG_TARGET,
"Proposer did not return the requested proof.",
);
return None;
}
};
let (header, extrinsics) = block.deconstruct();
let block_hash = header.hash();
// Create the parachain block data for the validators.
let b = ParachainBlockData::<Block>::new(header.clone(), extrinsics, proof);
let mut block_import_params = BlockImportParams::new(BlockOrigin::Own, header);
block_import_params.body = Some(b.extrinsics().to_vec());
// Best block is determined by the relay chain.
block_import_params.fork_choice = Some(ForkChoiceStrategy::Custom(false));
block_import_params.storage_changes = Some(storage_changes);
if let Err(err) = self
.block_import
.lock()
.import_block(block_import_params, Default::default())
{
error!(
target: LOG_TARGET,
"Error importing build block (at {:?}): {:?}",
b.header().parent_hash(),
err,
);
return None;
}
trace!(
target: LOG_TARGET,
"PoV size {{ header: {}kb, extrinsics: {}kb, storage_proof: {}kb }}",
b.header().encode().len() as f64 / 1024f64,
b.extrinsics().encode().len() as f64 / 1024f64,
b.storage_proof().encode().len() as f64 / 1024f64,
);
let collation =
self.build_collation(b, block_hash, validation_data.relay_parent_number)?;
let pov_hash = collation.proof_of_validity.hash();
self.wait_to_announce
.lock()
.wait_to_announce(block_hash, pov_hash);
info!(
target: LOG_TARGET,
"Produced proof-of-validity candidate {:?} from block {:?}.",
pov_hash,
block_hash,
);
Some(collation)
}
}
/// Parameters for [`start_collator`].
pub struct StartCollatorParams<Block: BlockT, PF, BI, Backend, BS, Spawner, PClient, PBackend> {
pub proposer_factory: PF,
pub inherent_data_providers: InherentDataProviders,
pub backend: Arc<Backend>,
pub block_import: BI,
pub block_status: Arc<BS>,
pub announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>,
pub overseer_handler: OverseerHandler,
pub spawner: Spawner,
pub para_id: ParaId,
pub key: CollatorPair,
pub polkadot_client: Arc<PClient>,
pub polkadot_backend: Arc<PBackend>,
}
pub async fn start_collator<
Block: BlockT,
PF,
BI,
Backend,
BS,
Spawner,
PClient,
PBackend,
PBackend2,
PApi,
>(
StartCollatorParams {
proposer_factory,
inherent_data_providers,
backend,
block_import,
block_status,
announce_block,
mut overseer_handler,
spawner,
para_id,
key,
polkadot_client,
polkadot_backend,
}: StartCollatorParams<Block, PF, BI, Backend, BS, Spawner, PClient, PBackend2>,
) -> Result<(), String>
where
PF: Environment<Block> + Send + 'static,
BI: BlockImport<Block, Error = sp_consensus::Error, Transaction = TransactionFor<PF, Block>>
+ Send
+ Sync
+ 'static,
Backend: sc_client_api::Backend<Block> + 'static,
BS: BlockBackend<Block> + Send + Sync + 'static,
Spawner: SpawnNamed + Clone + Send + Sync + 'static,
PBackend: sc_client_api::Backend<PBlock> + 'static,
PBackend::State: StateBackend<BlakeTwo256>,
PApi: RuntimeApiCollection<StateBackend = PBackend::State>,
PClient: polkadot_service::AbstractClient<PBlock, PBackend, Api = PApi> + 'static,
PBackend2: sc_client_api::Backend<PBlock> + 'static,
PBackend2::State: StateBackend<BlakeTwo256>,
{
let collator = Collator::new(
para_id,
proposer_factory,
inherent_data_providers,
overseer_handler.clone(),
block_import,
block_status,
Arc::new(spawner),
announce_block,
backend,
polkadot_client,
polkadot_backend,
);
let config = CollationGenerationConfig {
key,
para_id,
collator: Box::new(move |relay_parent, validation_data| {
let collator = collator.clone();
collator
.produce_candidate(relay_parent, validation_data.clone())
.boxed()
}),
};
overseer_handler
.send_msg(CollationGenerationMessage::Initialize(config))
.await;
overseer_handler
.send_msg(CollatorProtocolMessage::CollateOn(para_id))
.await;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::{pin::Pin, time::Duration};
use sp_core::{testing::TaskExecutor, Pair};
use sp_inherents::InherentData;
use sp_runtime::traits::DigestFor;
use cumulus_test_client::{
Client, DefaultTestClientBuilderExt, InitBlockBuilder, TestClientBuilder,
TestClientBuilderExt,
};
use cumulus_test_runtime::{Block, Header};
use polkadot_node_subsystem::messages::CollationGenerationMessage;
use polkadot_node_subsystem_test_helpers::ForwardSubsystem;
use polkadot_overseer::{AllSubsystems, Overseer};
use futures::{channel::mpsc, executor::block_on, future};
#[derive(Debug)]
struct Error;
impl From<sp_consensus::Error> for Error {
fn from(_: sp_consensus::Error) -> Self {
unimplemented!("Not required in tests")
}
}
struct DummyFactory(Arc<Client>);
impl Environment<Block> for DummyFactory {
type Proposer = DummyProposer;
type Error = Error;
type CreateProposer = Pin<
Box<dyn Future<Output = Result<Self::Proposer, Self::Error>> + Send + Unpin + 'static>,
>;
fn init(&mut self, header: &Header) -> Self::CreateProposer {
Box::pin(future::ready(Ok(DummyProposer {
client: self.0.clone(),
header: header.clone(),
})))
}
}
struct DummyProposer {
client: Arc<Client>,
header: Header,
}
impl Proposer<Block> for DummyProposer {
type Error = Error;
type Proposal = future::Ready<Result<Proposal<Block, Self::Transaction>, Error>>;
type Transaction = sc_client_api::TransactionFor<cumulus_test_client::Backend, Block>;
fn propose(
self,
_: InherentData,
_: DigestFor<Block>,
_: Duration,
_: RecordProof,
) -> Self::Proposal {
let block_id = BlockId::Hash(self.header.hash());
let builder = self
.client
.init_block_builder_at(&block_id, None, Default::default());
let (block, storage_changes, proof) =
builder.build().expect("Creates block").into_inner();
future::ready(Ok(Proposal {
block,
storage_changes,
proof,
}))
}
}
#[test]
fn collates_produces_a_block() {
let _ = env_logger::try_init();
let spawner = TaskExecutor::new();
let para_id = ParaId::from(100);
let announce_block = |_, _| ();
let client_builder = TestClientBuilder::new();
let backend = client_builder.backend();
let client = Arc::new(client_builder.build());
let header = client.header(&BlockId::Number(0)).unwrap().unwrap();
let (sub_tx, sub_rx) = mpsc::channel(64);
let all_subsystems =
AllSubsystems::<()>::dummy().replace_collation_generation(ForwardSubsystem(sub_tx));
let (overseer, handler) = Overseer::new(Vec::new(), all_subsystems, None, spawner.clone())
.expect("Creates overseer");
spawner.spawn("overseer", overseer.run().then(|_| async { () }).boxed());
let (polkadot_client, polkadot_backend, relay_parent) = {
// Create a polkadot client with a block imported.
use polkadot_test_client::{
ClientBlockImportExt as _, DefaultTestClientBuilderExt as _,
InitPolkadotBlockBuilder as _, TestClientBuilderExt as _,
};
let client_builder = polkadot_test_client::TestClientBuilder::new();
let polkadot_backend = client_builder.backend();
let mut client = client_builder.build();
let block_builder = client.init_polkadot_block_builder();
let block = block_builder.build().expect("Finalizes the block").block;
let hash = block.header().hash();
client
.import_as_best(BlockOrigin::Own, block)
.expect("Imports the block");
(client, polkadot_backend, hash)
};
let collator_start =
start_collator::<_, _, _, _, _, _, _, polkadot_service::FullBackend, _, _>(
StartCollatorParams {
proposer_factory: DummyFactory(client.clone()),
inherent_data_providers: Default::default(),
backend,
block_import: client.clone(),
block_status: client.clone(),
announce_block: Arc::new(announce_block),
overseer_handler: handler,
spawner,
para_id,
key: CollatorPair::generate().0,
polkadot_client: Arc::new(polkadot_client),
polkadot_backend,
},
);
block_on(collator_start).expect("Should start collator");
let msg = block_on(sub_rx.into_future())
.0
.expect("message should be send by `start_collator` above.");
let config = match msg {
CollationGenerationMessage::Initialize(config) => config,
};
let mut validation_data = PersistedValidationData::default();
validation_data.parent_head = header.encode().into();
let collation = block_on((config.collator)(relay_parent, &validation_data))
.expect("Collation is build");
let block_data = collation.proof_of_validity.block_data;
let block = Block::decode(&mut &block_data.0[..]).expect("Is a valid block");
assert_eq!(1, *block.header().number());
}
}
+39
View File
@@ -0,0 +1,39 @@
[package]
name = "cumulus-client-consensus"
description = "Proxy Polkadot's consensus as a consensus engine for Substrate"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
[dependencies]
# Substrate deps
sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-inherents = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-block-builder = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
substrate-prometheus-endpoint = { git = "https://github.com/paritytech/substrate", branch = "master" }
# Polkadot deps
polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "master" }
polkadot-runtime = { git = "https://github.com/paritytech/polkadot", branch = "master" }
# Other deps
futures = { version = "0.3.8", features = ["compat"] }
tokio = "0.1.22"
codec = { package = "parity-scale-codec", version = "2.0.0", features = [ "derive" ] }
tracing = "0.1.22"
[dev-dependencies]
# Substrate deps
sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" }
# Cumulus dependencies
cumulus-test-runtime = { path = "../../test/runtime" }
cumulus-test-client = { path = "../../test/client" }
# Other deps
futures-timer = "3.0.2"
@@ -0,0 +1,131 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Cumulus.
// Cumulus is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Cumulus is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.
use std::{marker::PhantomData, sync::Arc};
use sp_api::ProvideRuntimeApi;
use sp_block_builder::BlockBuilder as BlockBuilderApi;
use sp_blockchain::Result as ClientResult;
use sp_consensus::{
error::Error as ConsensusError,
import_queue::{BasicQueue, CacheKeyId, Verifier as VerifierT},
BlockImport, BlockImportParams, BlockOrigin, ForkChoiceStrategy,
};
use sp_inherents::InherentDataProviders;
use sp_runtime::{
generic::BlockId,
traits::{Block as BlockT, Header as HeaderT},
Justification,
};
/// A verifier that just checks the inherents.
struct Verifier<Client, Block> {
client: Arc<Client>,
inherent_data_providers: InherentDataProviders,
_marker: PhantomData<Block>,
}
impl<Client, Block> VerifierT<Block> for Verifier<Client, Block>
where
Block: BlockT,
Client: ProvideRuntimeApi<Block> + Send + Sync,
<Client as ProvideRuntimeApi<Block>>::Api: BlockBuilderApi<Block>,
{
fn verify(
&mut self,
origin: BlockOrigin,
header: Block::Header,
justification: Option<Justification>,
mut body: Option<Vec<Block::Extrinsic>>,
) -> Result<
(
BlockImportParams<Block, ()>,
Option<Vec<(CacheKeyId, Vec<u8>)>>,
),
String,
> {
if let Some(inner_body) = body.take() {
let inherent_data = self
.inherent_data_providers
.create_inherent_data()
.map_err(|e| e.into_string())?;
let block = Block::new(header.clone(), inner_body);
let inherent_res = self
.client
.runtime_api()
.check_inherents(
&BlockId::Hash(*header.parent_hash()),
block.clone(),
inherent_data,
)
.map_err(|e| format!("{:?}", e))?;
if !inherent_res.ok() {
inherent_res.into_errors().try_for_each(|(i, e)| {
Err(self.inherent_data_providers.error_to_string(&i, &e))
})?;
}
let (_, inner_body) = block.deconstruct();
body = Some(inner_body);
}
let post_hash = Some(header.hash());
let mut block_import_params = BlockImportParams::new(origin, header);
block_import_params.body = body;
block_import_params.justification = justification;
// Best block is determined by the relay chain, or if we are doing the intial sync
// we import all blocks as new best.
block_import_params.fork_choice = Some(ForkChoiceStrategy::Custom(
origin == BlockOrigin::NetworkInitialSync,
));
block_import_params.post_hash = post_hash;
Ok((block_import_params, None))
}
}
/// Start an import queue for a Cumulus collator that does not uses any special authoring logic.
pub fn import_queue<Client, Block: BlockT, I>(
client: Arc<Client>,
block_import: I,
inherent_data_providers: InherentDataProviders,
spawner: &impl sp_core::traits::SpawnNamed,
registry: Option<&substrate_prometheus_endpoint::Registry>,
) -> ClientResult<BasicQueue<Block, I::Transaction>>
where
I: BlockImport<Block, Error = ConsensusError> + Send + Sync + 'static,
I::Transaction: Send,
Client: ProvideRuntimeApi<Block> + Send + Sync + 'static,
<Client as ProvideRuntimeApi<Block>>::Api: BlockBuilderApi<Block>,
{
let verifier = Verifier {
client,
inherent_data_providers,
_marker: PhantomData,
};
Ok(BasicQueue::new(
verifier,
Box::new(block_import),
None,
spawner,
registry,
))
}
+840
View File
@@ -0,0 +1,840 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Cumulus.
// Cumulus is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Cumulus is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.
use sc_client_api::{
Backend, BlockBackend, BlockImportNotification, BlockchainEvents, Finalizer, UsageProvider,
};
use sp_api::ProvideRuntimeApi;
use sp_blockchain::{Error as ClientError, Result as ClientResult};
use sp_consensus::{
BlockImport, BlockImportParams, BlockOrigin, BlockStatus, Error as ConsensusError,
ForkChoiceStrategy, SelectChain as SelectChainT,
};
use sp_runtime::{
generic::BlockId,
traits::{Block as BlockT, Header as HeaderT},
};
use polkadot_primitives::v1::{
Block as PBlock, Id as ParaId, OccupiedCoreAssumption, ParachainHost,
};
use codec::Decode;
use futures::{future, select, FutureExt, Stream, StreamExt};
use std::{marker::PhantomData, sync::Arc};
pub mod import_queue;
/// Errors that can occur while following the polkadot relay-chain.
#[derive(Debug)]
pub enum Error {
/// An underlying client error.
Client(ClientError),
/// Head data returned was not for our parachain.
InvalidHeadData,
}
/// Helper for the relay chain client. This is expected to be a lightweight handle like an `Arc`.
pub trait RelaychainClient: Clone + 'static {
/// The error type for interacting with the Polkadot client.
type Error: std::fmt::Debug + Send;
/// A stream that yields head-data for a parachain.
type HeadStream: Stream<Item = Vec<u8>> + Send + Unpin;
/// Get a stream of new best heads for the given parachain.
fn new_best_heads(&self, para_id: ParaId) -> ClientResult<Self::HeadStream>;
/// Get a stream of finalized heads for the given parachain.
fn finalized_heads(&self, para_id: ParaId) -> ClientResult<Self::HeadStream>;
/// Returns the parachain head for the given `para_id` at the given block id.
fn parachain_head_at(
&self,
at: &BlockId<PBlock>,
para_id: ParaId,
) -> ClientResult<Option<Vec<u8>>>;
}
/// Follow the finalized head of the given parachain.
///
/// For every finalized block of the relay chain, it will get the included parachain header
/// corresponding to `para_id` and will finalize it in the parachain.
async fn follow_finalized_head<P, Block, B, R>(
para_id: ParaId,
parachain: Arc<P>,
relay_chain: R,
) -> ClientResult<()>
where
Block: BlockT,
P: Finalizer<Block, B> + UsageProvider<Block>,
R: RelaychainClient,
B: Backend<Block>,
{
let mut finalized_heads = relay_chain.finalized_heads(para_id)?;
loop {
let finalized_head = if let Some(h) = finalized_heads.next().await {
h
} else {
tracing::debug!(target: "cumulus-consensus", "Stopping following finalized head.");
return Ok(());
};
let header = match Block::Header::decode(&mut &finalized_head[..]) {
Ok(header) => header,
Err(err) => {
tracing::warn!(
target: "cumulus-consensus",
error = ?err,
"Could not decode parachain header while following finalized heads.",
);
continue;
}
};
let hash = header.hash();
// don't finalize the same block multiple times.
if parachain.usage_info().chain.finalized_hash != hash {
if let Err(e) = parachain.finalize_block(BlockId::hash(hash), None, true) {
match e {
ClientError::UnknownBlock(_) => tracing::debug!(
target: "cumulus-consensus",
block_hash = ?hash,
"Could not finalize block because it is unknown.",
),
_ => tracing::warn!(
target: "cumulus-consensus",
error = ?e,
block_hash = ?hash,
"Failed to finalize block",
),
}
}
}
}
}
/// Run the parachain consensus.
///
/// This will follow the given `relay_chain` to act as consesus for the parachain that corresponds
/// to the given `para_id`. It will set the new best block of the parachain as it gets aware of it.
/// The same happens for the finalized block.
///
/// # Note
///
/// This will access the backend of the parachain and thus, this future should be spawned as blocking
/// task.
pub async fn run_parachain_consensus<P, R, Block, B>(
para_id: ParaId,
parachain: Arc<P>,
relay_chain: R,
announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>,
) -> ClientResult<()>
where
Block: BlockT,
P: Finalizer<Block, B>
+ UsageProvider<Block>
+ Send
+ Sync
+ BlockBackend<Block>
+ BlockchainEvents<Block>,
for<'a> &'a P: BlockImport<Block>,
R: RelaychainClient,
B: Backend<Block>,
{
let follow_new_best = follow_new_best(
para_id,
parachain.clone(),
relay_chain.clone(),
announce_block,
);
let follow_finalized_head = follow_finalized_head(para_id, parachain, relay_chain);
select! {
r = follow_new_best.fuse() => r,
r = follow_finalized_head.fuse() => r,
}
}
/// Follow the relay chain new best head, to update the Parachain new best head.
async fn follow_new_best<P, R, Block, B>(
para_id: ParaId,
parachain: Arc<P>,
relay_chain: R,
announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>,
) -> ClientResult<()>
where
Block: BlockT,
P: Finalizer<Block, B>
+ UsageProvider<Block>
+ Send
+ Sync
+ BlockBackend<Block>
+ BlockchainEvents<Block>,
for<'a> &'a P: BlockImport<Block>,
R: RelaychainClient,
B: Backend<Block>,
{
let mut new_best_heads = relay_chain.new_best_heads(para_id)?.fuse();
let mut imported_blocks = parachain.import_notification_stream().fuse();
// The unset best header of the parachain. Will be `Some(_)` when we have imported a relay chain
// block before the parachain block it included. In this case we need to wait for this block to
// be imported to set it as new best.
let mut unset_best_header = None;
loop {
select! {
h = new_best_heads.next() => {
match h {
Some(h) => handle_new_best_parachain_head(
h,
&*parachain,
&*announce_block,
&mut unset_best_header,
),
None => {
tracing::debug!(
target: "cumulus-consensus",
"Stopping following new best.",
);
return Ok(())
}
}
},
i = imported_blocks.next() => {
match i {
Some(i) => handle_new_block_imported(
i,
&mut unset_best_header,
&*parachain,
&*announce_block,
),
None => {
tracing::debug!(
target: "cumulus-consensus",
"Stopping following imported blocks.",
);
return Ok(())
}
}
}
}
}
}
/// Handle a new import block of the parachain.
fn handle_new_block_imported<Block, P>(
notification: BlockImportNotification<Block>,
unset_best_header_opt: &mut Option<Block::Header>,
parachain: &P,
announce_block: &dyn Fn(Block::Hash, Vec<u8>),
) where
Block: BlockT,
P: UsageProvider<Block> + Send + Sync + BlockBackend<Block>,
for<'a> &'a P: BlockImport<Block>,
{
let unset_best_header = match (notification.is_new_best, &unset_best_header_opt) {
// If this is the new best block or we don't have any unset block, we can end it here.
(true, _) | (_, None) => return,
(false, Some(ref u)) => u,
};
let unset_hash = if notification.header.number() < unset_best_header.number() {
return;
} else if notification.header.number() == unset_best_header.number() {
let unset_hash = unset_best_header.hash();
if unset_hash != notification.hash {
return;
} else {
unset_hash
}
} else {
unset_best_header.hash()
};
match parachain.block_status(&BlockId::Hash(unset_hash)) {
Ok(BlockStatus::InChainWithState) => {
drop(unset_best_header);
let unset_best_header = unset_best_header_opt
.take()
.expect("We checked above that the value is set; qed");
import_block_as_new_best(unset_hash, unset_best_header, parachain, announce_block);
}
state => tracing::debug!(
target: "cumulus-consensus",
unset_best_header = ?unset_best_header,
imported_header = ?notification.header,
?state,
"Unexpected state for unset best header.",
),
}
}
/// Handle the new best parachain head as extracted from the new best relay chain.
fn handle_new_best_parachain_head<Block, P>(
head: Vec<u8>,
parachain: &P,
announce_block: &dyn Fn(Block::Hash, Vec<u8>),
unset_best_header: &mut Option<Block::Header>,
) where
Block: BlockT,
P: UsageProvider<Block> + Send + Sync + BlockBackend<Block>,
for<'a> &'a P: BlockImport<Block>,
{
let parachain_head = match <<Block as BlockT>::Header>::decode(&mut &head[..]) {
Ok(header) => header,
Err(err) => {
tracing::warn!(
target: "cumulus-consensus",
error = ?err,
"Could not decode Parachain header while following best heads.",
);
return;
}
};
let hash = parachain_head.hash();
if parachain.usage_info().chain.best_hash == hash {
tracing::debug!(
target: "cumulus-consensus",
block_hash = ?hash,
"Skipping set new best block, because block is already the best.",
)
} else {
// Make sure the block is already known or otherwise we skip setting new best.
match parachain.block_status(&BlockId::Hash(hash)) {
Ok(BlockStatus::InChainWithState) => {
unset_best_header.take();
import_block_as_new_best(hash, parachain_head, parachain, announce_block);
}
Ok(BlockStatus::InChainPruned) => {
tracing::error!(
target: "cumulus-collator",
block_hash = ?hash,
"Trying to set pruned block as new best!",
);
}
Ok(BlockStatus::Unknown) => {
*unset_best_header = Some(parachain_head);
tracing::debug!(
target: "cumulus-collator",
block_hash = ?hash,
"Parachain block not yet imported, waiting for import to enact as best block.",
);
}
Err(e) => {
tracing::error!(
target: "cumulus-collator",
block_hash = ?hash,
error = ?e,
"Failed to get block status of block.",
);
}
_ => {}
}
}
}
fn import_block_as_new_best<Block, P>(
hash: Block::Hash,
header: Block::Header,
parachain: &P,
announce_block: &dyn Fn(Block::Hash, Vec<u8>),
) where
Block: BlockT,
P: UsageProvider<Block> + Send + Sync + BlockBackend<Block>,
for<'a> &'a P: BlockImport<Block>,
{
// Make it the new best block
let mut block_import_params = BlockImportParams::new(BlockOrigin::ConsensusBroadcast, header);
block_import_params.fork_choice = Some(ForkChoiceStrategy::Custom(true));
block_import_params.import_existing = true;
if let Err(err) = (&*parachain).import_block(block_import_params, Default::default()) {
tracing::warn!(
target: "cumulus-consensus",
block_hash = ?hash,
error = ?err,
"Failed to set new best block.",
);
} else {
(*announce_block)(hash, Vec::new());
}
}
impl<T> RelaychainClient for Arc<T>
where
T: sc_client_api::BlockchainEvents<PBlock> + ProvideRuntimeApi<PBlock> + 'static + Send + Sync,
<T as ProvideRuntimeApi<PBlock>>::Api: ParachainHost<PBlock, Error = ClientError>,
{
type Error = ClientError;
type HeadStream = Box<dyn Stream<Item = Vec<u8>> + Send + Unpin>;
fn new_best_heads(&self, para_id: ParaId) -> ClientResult<Self::HeadStream> {
let polkadot = self.clone();
let s = self.import_notification_stream().filter_map(move |n| {
future::ready(if n.is_new_best {
polkadot
.parachain_head_at(&BlockId::hash(n.hash), para_id)
.ok()
.and_then(|h| h)
} else {
None
})
});
Ok(Box::new(s))
}
fn finalized_heads(&self, para_id: ParaId) -> ClientResult<Self::HeadStream> {
let polkadot = self.clone();
let s = self.finality_notification_stream().filter_map(move |n| {
future::ready(
polkadot
.parachain_head_at(&BlockId::hash(n.hash), para_id)
.ok()
.and_then(|h| h),
)
});
Ok(Box::new(s))
}
fn parachain_head_at(
&self,
at: &BlockId<PBlock>,
para_id: ParaId,
) -> ClientResult<Option<Vec<u8>>> {
self.runtime_api()
.persisted_validation_data(at, para_id, OccupiedCoreAssumption::TimedOut)
.map(|s| s.map(|s| s.parent_head.0))
}
}
/// Select chain implementation for parachains.
///
/// The actual behavior of the implementation depends on the select chain implementation used by
/// Polkadot.
pub struct SelectChain<Block, PC, SC> {
polkadot_client: PC,
polkadot_select_chain: SC,
para_id: ParaId,
_marker: PhantomData<Block>,
}
impl<Block, PC, SC> SelectChain<Block, PC, SC> {
/// Create new instance of `Self`.
///
/// - `para_id`: The id of the parachain.
/// - `polkadot_client`: The client of the Polkadot node.
/// - `polkadot_select_chain`: The Polkadot select chain implementation.
pub fn new(para_id: ParaId, polkadot_client: PC, polkadot_select_chain: SC) -> Self {
Self {
polkadot_client,
polkadot_select_chain,
para_id,
_marker: PhantomData,
}
}
}
impl<Block, PC: Clone, SC: Clone> Clone for SelectChain<Block, PC, SC> {
fn clone(&self) -> Self {
Self {
polkadot_client: self.polkadot_client.clone(),
polkadot_select_chain: self.polkadot_select_chain.clone(),
para_id: self.para_id,
_marker: PhantomData,
}
}
}
impl<Block, PC, SC> SelectChainT<Block> for SelectChain<Block, PC, SC>
where
Block: BlockT,
PC: RelaychainClient + Clone + Send + Sync,
PC::Error: ToString,
SC: SelectChainT<PBlock>,
{
fn leaves(&self) -> Result<Vec<<Block as BlockT>::Hash>, ConsensusError> {
let leaves = self.polkadot_select_chain.leaves()?;
leaves
.into_iter()
.filter_map(|l| {
self.polkadot_client
.parachain_head_at(&BlockId::Hash(l), self.para_id)
.map(|h| h.and_then(|d| <<Block as BlockT>::Hash>::decode(&mut &d[..]).ok()))
.transpose()
})
.collect::<Result<Vec<_>, _>>()
.map_err(|e| ConsensusError::ChainLookup(e.to_string()))
}
fn best_chain(&self) -> Result<<Block as BlockT>::Header, ConsensusError> {
let best_chain = self.polkadot_select_chain.best_chain()?;
let para_best_chain = self
.polkadot_client
.parachain_head_at(&BlockId::Hash(best_chain.hash()), self.para_id)
.map_err(|e| ConsensusError::ChainLookup(e.to_string()))?;
match para_best_chain {
Some(best) => Decode::decode(&mut &best[..]).map_err(|e| {
ConsensusError::ChainLookup(format!("Error decoding parachain head: {}", e))
}),
None => Err(ConsensusError::ChainLookup(
"Could not find parachain head for best relay chain!".into(),
)),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use codec::Encode;
use cumulus_test_client::{
runtime::{Block, Header},
Client, InitBlockBuilder, TestClientBuilder, TestClientBuilderExt,
};
use futures::{channel::mpsc, executor::block_on};
use futures_timer::Delay;
use std::{sync::Mutex, time::Duration};
struct RelaychainInner {
new_best_heads: Option<mpsc::UnboundedReceiver<Header>>,
finalized_heads: Option<mpsc::UnboundedReceiver<Header>>,
new_best_heads_sender: mpsc::UnboundedSender<Header>,
finalized_heads_sender: mpsc::UnboundedSender<Header>,
}
impl RelaychainInner {
fn new() -> Self {
let (new_best_heads_sender, new_best_heads) = mpsc::unbounded();
let (finalized_heads_sender, finalized_heads) = mpsc::unbounded();
Self {
new_best_heads_sender,
finalized_heads_sender,
new_best_heads: Some(new_best_heads),
finalized_heads: Some(finalized_heads),
}
}
}
#[derive(Clone)]
struct Relaychain {
inner: Arc<Mutex<RelaychainInner>>,
}
impl Relaychain {
fn new() -> Self {
Self {
inner: Arc::new(Mutex::new(RelaychainInner::new())),
}
}
}
impl RelaychainClient for Relaychain {
type Error = ClientError;
type HeadStream = Box<dyn Stream<Item = Vec<u8>> + Send + Unpin>;
fn new_best_heads(&self, _: ParaId) -> ClientResult<Self::HeadStream> {
let stream = self
.inner
.lock()
.unwrap()
.new_best_heads
.take()
.expect("Should only be called once");
Ok(Box::new(stream.map(|v| v.encode())))
}
fn finalized_heads(&self, _: ParaId) -> ClientResult<Self::HeadStream> {
let stream = self
.inner
.lock()
.unwrap()
.finalized_heads
.take()
.expect("Should only be called once");
Ok(Box::new(stream.map(|v| v.encode())))
}
fn parachain_head_at(
&self,
_: &BlockId<PBlock>,
_: ParaId,
) -> ClientResult<Option<Vec<u8>>> {
unimplemented!("Not required for tests")
}
}
fn build_and_import_block(mut client: Arc<Client>) -> Block {
let builder = client.init_block_builder(None, Default::default());
let block = builder.build().unwrap().block;
let (header, body) = block.clone().deconstruct();
let mut block_import_params = BlockImportParams::new(BlockOrigin::Own, header);
block_import_params.fork_choice = Some(ForkChoiceStrategy::Custom(false));
block_import_params.body = Some(body);
client
.import_block(block_import_params, Default::default())
.unwrap();
assert_eq!(0, client.chain_info().best_number);
block
}
#[test]
fn follow_new_best_works() {
sp_tracing::try_init_simple();
let client = Arc::new(TestClientBuilder::default().build());
let block = build_and_import_block(client.clone());
let relay_chain = Relaychain::new();
let new_best_heads_sender = relay_chain
.inner
.lock()
.unwrap()
.new_best_heads_sender
.clone();
let consensus =
run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}));
let work = async move {
new_best_heads_sender
.unbounded_send(block.header().clone())
.unwrap();
loop {
Delay::new(Duration::from_millis(100)).await;
if block.hash() == client.usage_info().chain.best_hash {
break;
}
}
};
block_on(async move {
futures::pin_mut!(consensus);
futures::pin_mut!(work);
select! {
r = consensus.fuse() => panic!("Consensus should not end: {:?}", r),
_ = work.fuse() => {},
}
});
}
#[test]
fn follow_finalized_works() {
sp_tracing::try_init_simple();
let client = Arc::new(TestClientBuilder::default().build());
let block = build_and_import_block(client.clone());
let relay_chain = Relaychain::new();
let finalized_sender = relay_chain
.inner
.lock()
.unwrap()
.finalized_heads_sender
.clone();
let consensus =
run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}));
let work = async move {
finalized_sender
.unbounded_send(block.header().clone())
.unwrap();
loop {
Delay::new(Duration::from_millis(100)).await;
if block.hash() == client.usage_info().chain.finalized_hash {
break;
}
}
};
block_on(async move {
futures::pin_mut!(consensus);
futures::pin_mut!(work);
select! {
r = consensus.fuse() => panic!("Consensus should not end: {:?}", r),
_ = work.fuse() => {},
}
});
}
#[test]
fn follow_finalized_does_not_stop_on_unknown_block() {
sp_tracing::try_init_simple();
let client = Arc::new(TestClientBuilder::default().build());
let block = build_and_import_block(client.clone());
let unknown_block = {
let block_builder = client.init_block_builder_at(
&BlockId::Hash(block.hash()),
None,
Default::default(),
);
block_builder.build().unwrap().block
};
let relay_chain = Relaychain::new();
let finalized_sender = relay_chain
.inner
.lock()
.unwrap()
.finalized_heads_sender
.clone();
let consensus =
run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}));
let work = async move {
for _ in 0..3usize {
finalized_sender
.unbounded_send(unknown_block.header().clone())
.unwrap();
Delay::new(Duration::from_millis(100)).await;
}
finalized_sender
.unbounded_send(block.header().clone())
.unwrap();
loop {
Delay::new(Duration::from_millis(100)).await;
if block.hash() == client.usage_info().chain.finalized_hash {
break;
}
}
};
block_on(async move {
futures::pin_mut!(consensus);
futures::pin_mut!(work);
select! {
r = consensus.fuse() => panic!("Consensus should not end: {:?}", r),
_ = work.fuse() => {},
}
});
}
// It can happen that we first import a relay chain block, while not yet having the parachain
// block imported that would be set to the best block. We need to make sure to import this
// block as new best block in the moment it is imported.
#[test]
fn follow_new_best_sets_best_after_it_is_imported() {
sp_tracing::try_init_simple();
let mut client = Arc::new(TestClientBuilder::default().build());
let block = build_and_import_block(client.clone());
let unknown_block = {
let block_builder = client.init_block_builder_at(
&BlockId::Hash(block.hash()),
None,
Default::default(),
);
block_builder.build().unwrap().block
};
let relay_chain = Relaychain::new();
let new_best_heads_sender = relay_chain
.inner
.lock()
.unwrap()
.new_best_heads_sender
.clone();
let consensus =
run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}));
let work = async move {
new_best_heads_sender
.unbounded_send(block.header().clone())
.unwrap();
loop {
Delay::new(Duration::from_millis(100)).await;
if block.hash() == client.usage_info().chain.best_hash {
break;
}
}
// Announce the unknown block
new_best_heads_sender
.unbounded_send(unknown_block.header().clone())
.unwrap();
// Do some iterations. As this is a local task executor, only one task can run at a time.
// Meaning that it should already have processed the unknown block.
for _ in 0..3usize {
Delay::new(Duration::from_millis(100)).await;
}
let (header, body) = unknown_block.clone().deconstruct();
let mut block_import_params = BlockImportParams::new(BlockOrigin::Own, header);
block_import_params.fork_choice = Some(ForkChoiceStrategy::Custom(false));
block_import_params.body = Some(body);
// Now import the unkown block to make it "known"
client
.import_block(block_import_params, Default::default())
.unwrap();
loop {
Delay::new(Duration::from_millis(100)).await;
if unknown_block.hash() == client.usage_info().chain.best_hash {
break;
}
}
};
block_on(async move {
futures::pin_mut!(consensus);
futures::pin_mut!(work);
select! {
r = consensus.fuse() => panic!("Consensus should not end: {:?}", r),
_ = work.fuse() => {},
}
});
}
}
+46
View File
@@ -0,0 +1,46 @@
[package]
name = "cumulus-client-network"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
description = "Cumulus-specific networking protocol"
edition = "2018"
[dependencies]
# Substrate deps
sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
# Polkadot deps
polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "master" }
polkadot-statement-table = { git = "https://github.com/paritytech/polkadot", branch = "master" }
polkadot-overseer = { git = "https://github.com/paritytech/polkadot", branch = "master" }
polkadot-node-primitives = { git = "https://github.com/paritytech/polkadot", branch = "master" }
polkadot-node-subsystem = { git = "https://github.com/paritytech/polkadot", branch = "master" }
polkadot-service = { git = "https://github.com/paritytech/polkadot", branch = "master" }
polkadot-parachain = { git = "https://github.com/paritytech/polkadot", branch = "master" }
# other deps
codec = { package = "parity-scale-codec", version = "2.0.0", features = [ "derive" ] }
futures = { version = "0.3.1", features = ["compat"] }
futures-timer = "3.0.2"
log = "0.4.8"
parking_lot = "0.10.2"
derive_more = "0.99.2"
[dev-dependencies]
# Cumulus deps
cumulus-test-service = { path = "../../test/service" }
# Polkadot deps
polkadot-test-client = { git = "https://github.com/paritytech/polkadot", branch = "master" }
# substrate deps
sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" }
+571
View File
@@ -0,0 +1,571 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Parachain specific networking
//!
//! Provides a custom block announcement implementation for parachains
//! that use the relay chain provided consensus. See [`BlockAnnounceValidator`]
//! and [`WaitToAnnounce`] for more information about this implementation.
#[cfg(test)]
mod tests;
mod wait_on_relay_chain_block;
use sc_client_api::{Backend, BlockchainEvents};
use sp_api::ProvideRuntimeApi;
use sp_blockchain::HeaderBackend;
use sp_consensus::{
block_validation::{BlockAnnounceValidator as BlockAnnounceValidatorT, Validation},
SyncOracle,
};
use sp_core::traits::SpawnNamed;
use sp_runtime::{
generic::BlockId,
traits::{Block as BlockT, HashFor, Header as HeaderT},
};
use polkadot_node_primitives::{SignedFullStatement, Statement};
use polkadot_node_subsystem::messages::StatementDistributionMessage;
use polkadot_overseer::OverseerHandler;
use polkadot_primitives::v1::{
Block as PBlock, CandidateReceipt, CompactStatement, Hash as PHash, Id as ParaId,
OccupiedCoreAssumption, ParachainHost, SignedStatement, SigningContext,
};
use polkadot_service::ClientHandle;
use codec::{Decode, Encode};
use futures::{
channel::{mpsc, oneshot},
future::{ready, FutureExt},
pin_mut, select, Future, StreamExt,
};
use log::trace;
use std::{convert::TryFrom, fmt, marker::PhantomData, pin::Pin, sync::Arc};
use wait_on_relay_chain_block::WaitOnRelayChainBlock;
const LOG_TARGET: &str = "cumulus-network";
type BoxedError = Box<dyn std::error::Error + Send>;
#[derive(Debug)]
struct BlockAnnounceError(String);
impl std::error::Error for BlockAnnounceError {}
impl fmt::Display for BlockAnnounceError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
/// The data that we attach to a block announcement.
///
/// This will be used to prove that a header belongs to a block that is probably being backed by
/// the relay chain.
#[derive(Encode, Decode, Debug)]
pub struct BlockAnnounceData {
receipt: CandidateReceipt,
statement: SignedStatement,
}
impl BlockAnnounceData {
/// Validate that the receipt, statement and announced header match.
///
/// This will not check the signature, for this you should use [`BlockAnnounceData::check_signature`].
fn validate(&self, encoded_header: Vec<u8>) -> Result<(), Validation> {
let candidate_hash = if let CompactStatement::Candidate(h) = self.statement.payload() {
h
} else {
log::debug!(
target: LOG_TARGET,
"`CompactStatement` isn't the candidate variant!",
);
return Err(Validation::Failure { disconnect: true });
};
if *candidate_hash != self.receipt.hash() {
log::debug!(
target: LOG_TARGET,
"Receipt candidate hash doesn't match candidate hash in statement",
);
return Err(Validation::Failure { disconnect: true });
}
if polkadot_parachain::primitives::HeadData(encoded_header).hash()
!= self.receipt.descriptor.para_head
{
log::debug!(
target: LOG_TARGET,
"Receipt para head hash doesn't match the hash of the header in the block announcement",
);
return Err(Validation::Failure { disconnect: true });
}
Ok(())
}
/// Check the signature of the statement.
///
/// Returns an `Err(_)` if it failed.
fn check_signature<P>(&self, relay_chain_client: &Arc<P>) -> Result<Validation, BlockAnnounceError>
where
P: ProvideRuntimeApi<PBlock> + Send + Sync + 'static,
P::Api: ParachainHost<PBlock>,
{
let runtime_api = relay_chain_client.runtime_api();
let validator_index = self.statement.validator_index();
let runtime_api_block_id = BlockId::Hash(self.receipt.descriptor.relay_parent);
let session_index = match runtime_api.session_index_for_child(&runtime_api_block_id) {
Ok(r) => r,
Err(e) => {
return Err(BlockAnnounceError(format!("{:?}", e)));
}
};
let signing_context = SigningContext {
parent_hash: self.receipt.descriptor.relay_parent,
session_index,
};
// Check that the signer is a legit validator.
let authorities = match runtime_api.validators(&runtime_api_block_id) {
Ok(r) => r,
Err(e) => {
return Err(BlockAnnounceError(format!("{:?}", e)));
}
};
let signer = match authorities.get(validator_index as usize) {
Some(r) => r,
None => {
log::debug!(
target: LOG_TARGET,
"Block announcement justification signer is a validator index out of bound",
);
return Ok(Validation::Failure { disconnect: true })
}
};
// Check statement is correctly signed.
if self
.statement
.check_signature(&signing_context, &signer)
.is_err()
{
log::debug!(
target: LOG_TARGET,
"Block announcement justification signature is invalid.",
);
return Ok(Validation::Failure { disconnect: true });
}
Ok(Validation::Success { is_new_best: true })
}
}
impl TryFrom<SignedFullStatement> for BlockAnnounceData {
type Error = ();
fn try_from(stmt: SignedFullStatement) -> Result<BlockAnnounceData, ()> {
let receipt = if let Statement::Seconded(receipt) = stmt.payload() {
receipt.to_plain()
} else {
return Err(());
};
Ok(BlockAnnounceData {
receipt,
statement: stmt.convert_payload(),
})
}
}
/// Parachain specific block announce validator.
///
/// This block announce validator is required if the parachain is running
/// with the relay chain provided consensus to make sure each node only
/// imports a reasonable number of blocks per round. The relay chain provided
/// consensus doesn't have any authorities and so it could happen that without
/// this special block announce validator a node would need to import *millions*
/// of blocks per round, which is clearly not doable.
///
/// To solve this problem, each block announcement is delayed until a collator
/// has received a [`Statement::Seconded`] for its `PoV`. This message tells the
/// collator that its `PoV` was validated successfully by a parachain validator and
/// that it is very likely that this `PoV` will be included in the relay chain. Every
/// collator that doesn't receive the message for its `PoV` will not announce its block.
/// For more information on the block announcement, see [`WaitToAnnounce`].
///
/// For each block announcement that is received, the generic block announcement validation
/// will call this validator and provides the extra data that was attached to the announcement.
/// We call this extra data `justification`.
/// It is expected that the attached data is a SCALE encoded [`BlockAnnounceData`]. The
/// statement is checked to be a [`CompactStatement::Candidate`] and that it is signed by an active
/// parachain validator.
///
/// If no justification was provided we check if the block announcement is at the tip of the known
/// chain. If it is at the tip, it is required to provide a justification or otherwise we reject
/// it. However, if the announcement is for a block below the tip the announcement is accepted
/// as it probably comes from a node that is currently syncing the chain.
pub struct BlockAnnounceValidator<Block, P, B, BCE> {
phantom: PhantomData<Block>,
relay_chain_client: Arc<P>,
relay_chain_backend: Arc<B>,
para_id: ParaId,
relay_chain_sync_oracle: Box<dyn SyncOracle + Send>,
wait_on_relay_chain_block: WaitOnRelayChainBlock<B, BCE>,
}
impl<Block, P, B, BCE> BlockAnnounceValidator<Block, P, B, BCE> {
/// Create a new [`BlockAnnounceValidator`].
pub fn new(
relay_chain_client: Arc<P>,
para_id: ParaId,
relay_chain_sync_oracle: Box<dyn SyncOracle + Send>,
relay_chain_backend: Arc<B>,
relay_chain_blockchain_events: Arc<BCE>,
) -> Self {
Self {
phantom: Default::default(),
relay_chain_client,
para_id,
relay_chain_sync_oracle,
relay_chain_backend: relay_chain_backend.clone(),
wait_on_relay_chain_block: WaitOnRelayChainBlock::new(
relay_chain_backend,
relay_chain_blockchain_events,
),
}
}
}
impl<Block: BlockT, P, B, BCE> BlockAnnounceValidator<Block, P, B, BCE>
where
P: ProvideRuntimeApi<PBlock> + Send + Sync + 'static,
P::Api: ParachainHost<PBlock>,
B: Backend<PBlock> + 'static,
// Rust bug: https://github.com/rust-lang/rust/issues/24159
sc_client_api::StateBackendFor<B, PBlock>: sc_client_api::StateBackend<HashFor<PBlock>>,
{
/// Handle a block announcement with empty data (no statement) attached to it.
fn handle_empty_block_announce_data(
&self,
header: Block::Header,
) -> impl Future<Output = Result<Validation, BoxedError>> {
let relay_chain_client = self.relay_chain_client.clone();
let relay_chain_backend = self.relay_chain_backend.clone();
let para_id = self.para_id;
async move {
// Check if block is equal or higher than best (this requires a justification)
let relay_chain_info = relay_chain_backend.blockchain().info();
let runtime_api_block_id = BlockId::Hash(relay_chain_info.best_hash);
let block_number = header.number();
let local_validation_data = relay_chain_client
.runtime_api()
.persisted_validation_data(
&runtime_api_block_id,
para_id,
OccupiedCoreAssumption::TimedOut,
)
.map_err(|e| Box::new(BlockAnnounceError(format!("{:?}", e))) as Box<_>)?
.ok_or_else(|| {
Box::new(BlockAnnounceError(
"Could not find parachain head in relay chain".into(),
)) as Box<_>
})?;
let parent_head = Block::Header::decode(&mut &local_validation_data.parent_head.0[..])
.map_err(|e| {
Box::new(BlockAnnounceError(format!(
"Failed to decode parachain head: {:?}",
e
))) as Box<_>
})?;
let known_best_number = parent_head.number();
if block_number >= known_best_number {
trace!(
target: "cumulus-network",
"validation failed because a justification is needed if the block at the top of the chain."
);
Ok(Validation::Failure { disconnect: false })
} else {
Ok(Validation::Success { is_new_best: false })
}
}
}
}
impl<Block: BlockT, P, B, BCE> BlockAnnounceValidatorT<Block>
for BlockAnnounceValidator<Block, P, B, BCE>
where
P: ProvideRuntimeApi<PBlock> + Send + Sync + 'static,
P::Api: ParachainHost<PBlock>,
B: Backend<PBlock> + 'static,
BCE: BlockchainEvents<PBlock> + 'static + Send + Sync,
// Rust bug: https://github.com/rust-lang/rust/issues/24159
sc_client_api::StateBackendFor<B, PBlock>: sc_client_api::StateBackend<HashFor<PBlock>>,
{
fn validate(
&mut self,
header: &Block::Header,
mut data: &[u8],
) -> Pin<Box<dyn Future<Output = Result<Validation, BoxedError>> + Send>> {
if self.relay_chain_sync_oracle.is_major_syncing() {
return ready(Ok(Validation::Success { is_new_best: false })).boxed();
}
if data.is_empty() {
return self
.handle_empty_block_announce_data(header.clone())
.boxed();
}
let block_announce_data = match BlockAnnounceData::decode(&mut data) {
Ok(r) => r,
Err(_) => {
return ready(Err(Box::new(BlockAnnounceError(
"Can not decode the `BlockAnnounceData`".into(),
)) as Box<_>))
.boxed()
}
};
let relay_chain_client = self.relay_chain_client.clone();
let header_encoded = header.encode();
let wait_on_relay_chain_block = self.wait_on_relay_chain_block.clone();
async move {
if let Err(e) = block_announce_data.validate(header_encoded) {
return Ok(e);
}
let relay_parent = block_announce_data.receipt.descriptor.relay_parent;
wait_on_relay_chain_block
.wait_on_relay_chain_block(relay_parent)
.await
.map_err(|e| Box::new(BlockAnnounceError(e.to_string())) as Box<_>)?;
block_announce_data
.check_signature(&relay_chain_client)
.map_err(|e| Box::new(e) as Box<_>)
}
.boxed()
}
}
/// Build a block announce validator instance.
///
/// Returns a boxed [`BlockAnnounceValidator`].
pub fn build_block_announce_validator<Block: BlockT, B>(
relay_chain_client: polkadot_service::Client,
para_id: ParaId,
relay_chain_sync_oracle: Box<dyn SyncOracle + Send>,
relay_chain_backend: Arc<B>,
) -> Box<dyn BlockAnnounceValidatorT<Block> + Send>
where
B: Backend<PBlock> + Send + 'static,
// Rust bug: https://github.com/rust-lang/rust/issues/24159
sc_client_api::StateBackendFor<B, PBlock>: sc_client_api::StateBackend<HashFor<PBlock>>,
{
BlockAnnounceValidatorBuilder::new(
relay_chain_client,
para_id,
relay_chain_sync_oracle,
relay_chain_backend,
)
.build()
}
/// Block announce validator builder.
///
/// Builds a [`BlockAnnounceValidator`] for a parachain. As this requires
/// a concrete relay chain client instance, the builder takes a [`polkadot_service::Client`]
/// that wraps this concrete instanace. By using [`polkadot_service::ExecuteWithClient`]
/// the builder gets access to this concrete instance.
struct BlockAnnounceValidatorBuilder<Block, B> {
phantom: PhantomData<Block>,
relay_chain_client: polkadot_service::Client,
para_id: ParaId,
relay_chain_sync_oracle: Box<dyn SyncOracle + Send>,
relay_chain_backend: Arc<B>,
}
impl<Block: BlockT, B> BlockAnnounceValidatorBuilder<Block, B>
where
B: Backend<PBlock> + Send + 'static,
// Rust bug: https://github.com/rust-lang/rust/issues/24159
sc_client_api::StateBackendFor<B, PBlock>: sc_client_api::StateBackend<HashFor<PBlock>>,
{
/// Create a new instance of the builder.
fn new(
relay_chain_client: polkadot_service::Client,
para_id: ParaId,
relay_chain_sync_oracle: Box<dyn SyncOracle + Send>,
relay_chain_backend: Arc<B>,
) -> Self {
Self {
relay_chain_client,
para_id,
relay_chain_sync_oracle,
relay_chain_backend,
phantom: PhantomData,
}
}
/// Build the block announce validator.
fn build(self) -> Box<dyn BlockAnnounceValidatorT<Block> + Send> {
self.relay_chain_client.clone().execute_with(self)
}
}
impl<Block: BlockT, B> polkadot_service::ExecuteWithClient
for BlockAnnounceValidatorBuilder<Block, B>
where
B: Backend<PBlock> + Send + 'static,
// Rust bug: https://github.com/rust-lang/rust/issues/24159
sc_client_api::StateBackendFor<B, PBlock>: sc_client_api::StateBackend<HashFor<PBlock>>,
{
type Output = Box<dyn BlockAnnounceValidatorT<Block> + Send>;
fn execute_with_client<PClient, Api, PBackend>(self, client: Arc<PClient>) -> Self::Output
where
<Api as sp_api::ApiExt<PBlock>>::StateBackend:
sp_api::StateBackend<sp_runtime::traits::BlakeTwo256>,
PBackend: Backend<PBlock>,
PBackend::State: sp_api::StateBackend<sp_runtime::traits::BlakeTwo256>,
Api: polkadot_service::RuntimeApiCollection<StateBackend = PBackend::State>,
PClient: polkadot_service::AbstractClient<PBlock, PBackend, Api = Api> + 'static,
{
Box::new(BlockAnnounceValidator::new(
client.clone(),
self.para_id,
self.relay_chain_sync_oracle,
self.relay_chain_backend,
client,
))
}
}
/// Wait before announcing a block that a candidate message has been received for this block, then
/// add this message as justification for the block announcement.
///
/// This object will spawn a new task every time the method `wait_to_announce` is called and cancel
/// the previous task running.
pub struct WaitToAnnounce<Block: BlockT> {
spawner: Arc<dyn SpawnNamed + Send + Sync>,
announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>,
overseer_handler: OverseerHandler,
current_trigger: oneshot::Sender<()>,
}
impl<Block: BlockT> WaitToAnnounce<Block> {
/// Create the `WaitToAnnounce` object
pub fn new(
spawner: Arc<dyn SpawnNamed + Send + Sync>,
announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>,
overseer_handler: OverseerHandler,
) -> WaitToAnnounce<Block> {
let (tx, _rx) = oneshot::channel();
WaitToAnnounce {
spawner,
announce_block,
overseer_handler,
current_trigger: tx,
}
}
/// Wait for a candidate message for the block, then announce the block. The candidate
/// message will be added as justification to the block announcement.
pub fn wait_to_announce(&mut self, block_hash: <Block as BlockT>::Hash, pov_hash: PHash) {
let (tx, rx) = oneshot::channel();
let announce_block = self.announce_block.clone();
let overseer_handler = self.overseer_handler.clone();
self.current_trigger = tx;
self.spawner.spawn(
"cumulus-wait-to-announce",
async move {
let t1 = wait_to_announce::<Block>(
block_hash,
pov_hash,
announce_block,
overseer_handler,
)
.fuse();
let t2 = rx.fuse();
pin_mut!(t1, t2);
trace!(
target: "cumulus-network",
"waiting for announce block in a background task...",
);
select! {
_ = t1 => {
trace!(
target: "cumulus-network",
"block announcement finished",
);
},
_ = t2 => {
trace!(
target: "cumulus-network",
"previous task that waits for announce block has been canceled",
);
}
}
}
.boxed(),
);
}
}
async fn wait_to_announce<Block: BlockT>(
block_hash: <Block as BlockT>::Hash,
pov_hash: PHash,
announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>,
mut overseer_handler: OverseerHandler,
) {
let (sender, mut receiver) = mpsc::channel(5);
overseer_handler
.send_msg(StatementDistributionMessage::RegisterStatementListener(
sender,
))
.await;
while let Some(statement) = receiver.next().await {
match statement.payload() {
Statement::Seconded(c) if &c.descriptor.pov_hash == &pov_hash => {
if let Ok(data) = BlockAnnounceData::try_from(statement) {
announce_block(block_hash, data.encode());
}
break;
}
_ => {}
}
}
}
+422
View File
@@ -0,0 +1,422 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use super::*;
use cumulus_test_service::runtime::{Block, Header};
use futures::{executor::block_on, poll, task::Poll};
use polkadot_node_primitives::{SignedFullStatement, Statement};
use polkadot_primitives::v1::{
Block as PBlock, BlockNumber, CandidateCommitments, CandidateDescriptor, CandidateEvent,
CommittedCandidateReceipt, CoreState, GroupRotationInfo, Hash as PHash, HeadData, Id as ParaId,
InboundDownwardMessage, InboundHrmpMessage, OccupiedCoreAssumption, ParachainHost,
PersistedValidationData, SessionIndex, SessionInfo, SigningContext, ValidationCode,
ValidatorId, ValidatorIndex,
};
use polkadot_test_client::{
Client as PClient, ClientBlockImportExt, DefaultTestClientBuilderExt, FullBackend as PBackend,
InitPolkadotBlockBuilder, TestClientBuilder, TestClientBuilderExt,
};
use sp_api::{ApiRef, ProvideRuntimeApi};
use sp_blockchain::HeaderBackend;
use sp_consensus::{block_validation::BlockAnnounceValidator as _, BlockOrigin};
use sp_core::H256;
use sp_keyring::Sr25519Keyring;
use sp_keystore::{testing::KeyStore, SyncCryptoStore, SyncCryptoStorePtr};
use sp_runtime::RuntimeAppPublic;
use std::collections::BTreeMap;
fn check_error(error: crate::BoxedError, check_error: impl Fn(&BlockAnnounceError) -> bool) {
let error = *error
.downcast::<BlockAnnounceError>()
.expect("Downcasts error to `ClientError`");
if !check_error(&error) {
panic!("Invalid error: {:?}", error);
}
}
#[derive(Clone)]
struct DummyCollatorNetwork;
impl SyncOracle for DummyCollatorNetwork {
fn is_major_syncing(&mut self) -> bool {
false
}
fn is_offline(&mut self) -> bool {
unimplemented!("Not required in tests")
}
}
fn make_validator_and_api() -> (
BlockAnnounceValidator<Block, TestApi, PBackend, PClient>,
Arc<TestApi>,
) {
let api = Arc::new(TestApi::new());
(
BlockAnnounceValidator::new(
api.clone(),
ParaId::from(56),
Box::new(DummyCollatorNetwork),
api.relay_backend.clone(),
api.relay_client.clone(),
),
api,
)
}
fn default_header() -> Header {
Header {
number: 1,
digest: Default::default(),
extrinsics_root: Default::default(),
parent_hash: Default::default(),
state_root: Default::default(),
}
}
/// Same as [`make_gossip_message_and_header`], but using the genesis header as relay parent.
async fn make_gossip_message_and_header_using_genesis(
api: Arc<TestApi>,
validator_index: u32,
) -> (SignedFullStatement, Header) {
let relay_parent = api
.relay_client
.hash(0)
.ok()
.flatten()
.expect("Genesis hash exists");
make_gossip_message_and_header(api, relay_parent, validator_index).await
}
async fn make_gossip_message_and_header(
api: Arc<TestApi>,
relay_parent: H256,
validator_index: u32,
) -> (SignedFullStatement, Header) {
let keystore: SyncCryptoStorePtr = Arc::new(KeyStore::new());
let alice_public = SyncCryptoStore::sr25519_generate_new(
&*keystore,
ValidatorId::ID,
Some(&Sr25519Keyring::Alice.to_seed()),
)
.unwrap();
let session_index = api
.runtime_api()
.session_index_for_child(&BlockId::Hash(relay_parent))
.unwrap();
let signing_context = SigningContext {
parent_hash: relay_parent,
session_index,
};
let header = default_header();
let candidate_receipt = CommittedCandidateReceipt {
commitments: CandidateCommitments {
head_data: header.encode().into(),
..Default::default()
},
descriptor: CandidateDescriptor {
relay_parent,
para_head: polkadot_parachain::primitives::HeadData(header.encode()).hash(),
..Default::default()
},
};
let statement = Statement::Seconded(candidate_receipt);
let signed = SignedFullStatement::sign(
&keystore,
statement,
&signing_context,
validator_index,
&alice_public.into(),
)
.await
.expect("Signing statement");
(signed, header)
}
#[test]
fn valid_if_no_data_and_less_than_best_known_number() {
let mut validator = make_validator_and_api().0;
let header = Header {
number: 0,
..default_header()
};
let res = block_on(validator.validate(&header, &[]));
assert_eq!(
res.unwrap(),
Validation::Success { is_new_best: false },
"validating without data with block number < best known number is always a success",
);
}
#[test]
fn invalid_if_no_data_exceeds_best_known_number() {
let mut validator = make_validator_and_api().0;
let header = Header {
number: 1,
..default_header()
};
let res = block_on(validator.validate(&header, &[]));
assert_eq!(
res.unwrap(),
Validation::Failure { disconnect: false },
"validation fails if no justification and block number >= best known number",
);
}
#[test]
fn check_statement_is_encoded_correctly() {
let mut validator = make_validator_and_api().0;
let header = default_header();
let res = block_on(validator.validate(&header, &[0x42]))
.err()
.expect("Should fail on invalid encoded statement");
check_error(res, |error| {
matches!(
error,
BlockAnnounceError(x) if x.contains("Can not decode the `BlockAnnounceData`")
)
});
}
#[test]
fn check_signer_is_legit_validator() {
let (mut validator, api) = make_validator_and_api();
let (signed_statement, header) = block_on(make_gossip_message_and_header_using_genesis(api, 1));
let data = BlockAnnounceData::try_from(signed_statement).unwrap().encode();
let res = block_on(validator.validate(&header, &data));
assert_eq!(Validation::Failure { disconnect: true }, res.unwrap());
}
#[test]
fn check_statement_is_correctly_signed() {
let (mut validator, api) = make_validator_and_api();
let (signed_statement, header) = block_on(make_gossip_message_and_header_using_genesis(api, 0));
let mut data = BlockAnnounceData::try_from(signed_statement).unwrap().encode();
// The signature comes at the end of the type, so change a bit to make the signature invalid.
let last = data.len() - 1;
data[last] = data[last].wrapping_add(1);
let res = block_on(validator.validate(&header, &data));
assert_eq!(Validation::Failure { disconnect: true }, res.unwrap());
}
#[test]
fn check_statement_seconded() {
let (mut validator, api) = make_validator_and_api();
let header = default_header();
let relay_parent = H256::from_low_u64_be(1);
let keystore: SyncCryptoStorePtr = Arc::new(KeyStore::new());
let alice_public = SyncCryptoStore::sr25519_generate_new(
&*keystore,
ValidatorId::ID,
Some(&Sr25519Keyring::Alice.to_seed()),
)
.unwrap();
let session_index = api
.runtime_api()
.session_index_for_child(&BlockId::Hash(relay_parent))
.unwrap();
let signing_context = SigningContext {
parent_hash: relay_parent,
session_index,
};
let statement = Statement::Valid(Default::default());
let signed_statement = block_on(SignedFullStatement::sign(
&keystore,
statement,
&signing_context,
0,
&alice_public.into(),
))
.expect("Signs statement");
let data = BlockAnnounceData {
receipt: Default::default(),
statement: signed_statement.convert_payload(),
}.encode();
let res = block_on(validator.validate(&header, &data));
assert_eq!(Validation::Failure { disconnect: true }, res.unwrap());
}
#[test]
fn check_header_match_candidate_receipt_header() {
let (mut validator, api) = make_validator_and_api();
let (signed_statement, mut header) =
block_on(make_gossip_message_and_header_using_genesis(api, 0));
let data = BlockAnnounceData::try_from(signed_statement).unwrap().encode();
header.number = 300;
let res = block_on(validator.validate(&header, &data));
assert_eq!(Validation::Failure { disconnect: true }, res.unwrap());
}
/// Test that ensures that we postpone the block announce verification until
/// a relay chain block is imported. This is important for when we receive a
/// block announcement before we have imported the associated relay chain block
/// which can happen on slow nodes or nodes with a slow network connection.
#[test]
fn relay_parent_not_imported_when_block_announce_is_processed() {
block_on(async move {
let (mut validator, api) = make_validator_and_api();
let mut client = api.relay_client.clone();
let block = client
.init_polkadot_block_builder()
.build()
.expect("Build new block")
.block;
let (signed_statement, header) = make_gossip_message_and_header(api, block.hash(), 0).await;
let data = BlockAnnounceData::try_from(signed_statement).unwrap().encode();
let mut validation = validator.validate(&header, &data);
// The relay chain block is not available yet, so the first poll should return
// that the future is still pending.
assert!(poll!(&mut validation).is_pending());
client
.import(BlockOrigin::Own, block)
.expect("Imports the block");
assert!(matches!(
poll!(validation),
Poll::Ready(Ok(Validation::Success { is_new_best: true }))
));
});
}
#[derive(Default)]
struct ApiData {
validators: Vec<ValidatorId>,
}
struct TestApi {
data: Arc<ApiData>,
relay_client: Arc<PClient>,
relay_backend: Arc<PBackend>,
}
impl TestApi {
fn new() -> Self {
let builder = TestClientBuilder::new();
let relay_backend = builder.backend();
Self {
data: Arc::new(ApiData {
validators: vec![Sr25519Keyring::Alice.public().into()],
}),
relay_client: Arc::new(builder.build()),
relay_backend,
}
}
}
#[derive(Default)]
struct RuntimeApi {
data: Arc<ApiData>,
}
impl ProvideRuntimeApi<PBlock> for TestApi {
type Api = RuntimeApi;
fn runtime_api<'a>(&'a self) -> ApiRef<'a, Self::Api> {
RuntimeApi {
data: self.data.clone(),
}
.into()
}
}
sp_api::mock_impl_runtime_apis! {
impl ParachainHost<PBlock> for RuntimeApi {
type Error = sp_blockchain::Error;
fn validators(&self) -> Vec<ValidatorId> {
self.data.validators.clone()
}
fn validator_groups(&self) -> (Vec<Vec<ValidatorIndex>>, GroupRotationInfo<BlockNumber>) {
(Vec::new(), GroupRotationInfo { session_start_block: 0, group_rotation_frequency: 0, now: 0 })
}
fn availability_cores(&self) -> Vec<CoreState<PHash>> {
Vec::new()
}
fn persisted_validation_data(&self, _: ParaId, _: OccupiedCoreAssumption) -> Option<PersistedValidationData<BlockNumber>> {
Some(PersistedValidationData {
parent_head: HeadData(default_header().encode()),
..Default::default()
})
}
fn session_index_for_child(&self) -> SessionIndex {
0
}
fn validation_code(&self, _: ParaId, _: OccupiedCoreAssumption) -> Option<ValidationCode> {
None
}
fn candidate_pending_availability(&self, _: ParaId) -> Option<CommittedCandidateReceipt<PHash>> {
None
}
fn candidate_events(&self) -> Vec<CandidateEvent<PHash>> {
Vec::new()
}
fn session_info(_: SessionIndex) -> Option<SessionInfo> {
None
}
fn check_validation_outputs(_: ParaId, _: CandidateCommitments) -> bool {
false
}
fn dmq_contents(_: ParaId) -> Vec<InboundDownwardMessage<BlockNumber>> {
Vec::new()
}
fn historical_validation_code(_: ParaId, _: BlockNumber) -> Option<ValidationCode> {
None
}
fn inbound_hrmp_channels_contents(
_: ParaId,
) -> BTreeMap<ParaId, Vec<InboundHrmpMessage<BlockNumber>>> {
BTreeMap::new()
}
}
}
@@ -0,0 +1,264 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Provides the [`WaitOnRelayChainBlock`] type.
use futures::{future::ready, Future, FutureExt, StreamExt};
use polkadot_primitives::v1::{Block as PBlock, Hash as PHash};
use sc_client_api::{
blockchain::{self, BlockStatus, HeaderBackend},
Backend, BlockchainEvents,
};
use sp_runtime::{generic::BlockId, traits::HashFor};
use std::{sync::Arc, time::Duration};
/// The timeout in seconds after that the waiting for a block should be aborted.
const TIMEOUT_IN_SECONDS: u64 = 6;
/// Custom error type used by [`WaitOnRelayChainBlock`].
#[derive(Debug, derive_more::Display)]
pub enum Error {
#[display(
fmt = "Timeout while waiting for relay-chain block `{}` to be imported.",
_0
)]
Timeout(PHash),
#[display(
fmt = "Import listener closed while waiting for relay-chain block `{}` to be imported.",
_0
)]
ImportListenerClosed(PHash),
#[display(
fmt = "Blockchain returned an error while waiting for relay-chain block `{}` to be imported: {:?}",
_0,
_1
)]
BlockchainError(PHash, blockchain::Error),
}
/// A helper to wait for a given relay chain block in an async way.
///
/// The caller needs to pass the hash of a block it waits for and the function will return when the
/// block is available or an error occurred.
///
/// The waiting for the block is implemented as follows:
///
/// 1. Get a read lock on the import lock from the backend.
///
/// 2. Check if the block is already imported. If yes, return from the function.
///
/// 3. If the block isn't imported yet, add an import notification listener.
///
/// 4. Poll the import notification listener until the block is imported or the timeout is fired.
///
/// The timeout is set to 6 seconds. This should be enough time to import the block in the current
/// round and if not, the new round of the relay chain already started anyway.
pub struct WaitOnRelayChainBlock<B, BCE> {
block_chain_events: Arc<BCE>,
backend: Arc<B>,
}
impl<B, BCE> Clone for WaitOnRelayChainBlock<B, BCE> {
fn clone(&self) -> Self {
Self {
backend: self.backend.clone(),
block_chain_events: self.block_chain_events.clone(),
}
}
}
impl<B, BCE> WaitOnRelayChainBlock<B, BCE> {
/// Creates a new instance of `Self`.
pub fn new(backend: Arc<B>, block_chain_events: Arc<BCE>) -> Self {
Self {
backend,
block_chain_events,
}
}
}
impl<B, BCE> WaitOnRelayChainBlock<B, BCE>
where
B: Backend<PBlock>,
BCE: BlockchainEvents<PBlock>,
// Rust bug: https://github.com/rust-lang/rust/issues/24159
sc_client_api::StateBackendFor<B, PBlock>: sc_client_api::StateBackend<HashFor<PBlock>>,
{
pub fn wait_on_relay_chain_block(
&self,
hash: PHash,
) -> impl Future<Output = Result<(), Error>> {
let _lock = self.backend.get_import_lock().read();
match self.backend.blockchain().status(BlockId::Hash(hash)) {
Ok(BlockStatus::InChain) => {
return ready(Ok(())).boxed();
}
Err(err) => return ready(Err(Error::BlockchainError(hash, err))).boxed(),
_ => {}
}
let mut listener = self.block_chain_events.import_notification_stream();
// Now it is safe to drop the lock, even when the block is now imported, it should show
// up in our registered listener.
drop(_lock);
let mut timeout = futures_timer::Delay::new(Duration::from_secs(TIMEOUT_IN_SECONDS)).fuse();
async move {
loop {
futures::select! {
_ = timeout => return Err(Error::Timeout(hash)),
evt = listener.next() => match evt {
Some(evt) if evt.hash == hash => return Ok(()),
// Not the event we waited on.
Some(_) => continue,
None => return Err(Error::ImportListenerClosed(hash)),
}
}
}
}
.boxed()
}
}
#[cfg(test)]
mod tests {
use super::*;
use polkadot_test_client::{
construct_transfer_extrinsic, BlockBuilderExt, Client, ClientBlockImportExt,
DefaultTestClientBuilderExt, ExecutionStrategy, FullBackend, InitPolkadotBlockBuilder,
TestClientBuilder, TestClientBuilderExt,
};
use sp_consensus::BlockOrigin;
use sp_runtime::traits::Block as BlockT;
use futures::{executor::block_on, poll, task::Poll};
fn build_client_backend_and_block() -> (Arc<Client>, Arc<FullBackend>, PBlock) {
let builder =
TestClientBuilder::new().set_execution_strategy(ExecutionStrategy::NativeWhenPossible);
let backend = builder.backend();
let client = Arc::new(builder.build());
let block_builder = client.init_polkadot_block_builder();
let block = block_builder.build().expect("Finalizes the block").block;
(client, backend, block)
}
#[test]
fn returns_directly_for_available_block() {
let (mut client, backend, block) = build_client_backend_and_block();
let hash = block.hash();
client
.import(BlockOrigin::Own, block)
.expect("Imports the block");
let wait = WaitOnRelayChainBlock::new(backend, client);
block_on(async move {
// Should be ready on the first poll
assert!(matches!(
poll!(wait.wait_on_relay_chain_block(hash)),
Poll::Ready(Ok(()))
));
});
}
#[test]
fn resolve_after_block_import_notification_was_received() {
let (mut client, backend, block) = build_client_backend_and_block();
let hash = block.hash();
let wait = WaitOnRelayChainBlock::new(backend, client.clone());
block_on(async move {
let mut future = wait.wait_on_relay_chain_block(hash);
// As the block is not yet imported, the first poll should return `Pending`
assert!(poll!(&mut future).is_pending());
// Import the block that should fire the notification
client
.import(BlockOrigin::Own, block)
.expect("Imports the block");
// Now it should have received the notification and report that the block was imported
assert!(matches!(poll!(future), Poll::Ready(Ok(()))));
});
}
#[test]
fn wait_for_block_time_out_when_block_is_not_imported() {
let (client, backend, block) = build_client_backend_and_block();
let hash = block.hash();
let wait = WaitOnRelayChainBlock::new(backend, client.clone());
assert!(matches!(
block_on(wait.wait_on_relay_chain_block(hash)),
Err(Error::Timeout(_))
));
}
#[test]
fn do_not_resolve_after_different_block_import_notification_was_received() {
let (mut client, backend, block) = build_client_backend_and_block();
let hash = block.hash();
let ext = construct_transfer_extrinsic(
&*client,
sp_keyring::Sr25519Keyring::Alice,
sp_keyring::Sr25519Keyring::Bob,
1000,
);
let mut block_builder = client.init_polkadot_block_builder();
// Push an extrinsic to get a different block hash.
block_builder
.push_polkadot_extrinsic(ext)
.expect("Push extrinsic");
let block2 = block_builder.build().expect("Build second block").block;
let hash2 = block2.hash();
let wait = WaitOnRelayChainBlock::new(backend, client.clone());
block_on(async move {
let mut future = wait.wait_on_relay_chain_block(hash);
let mut future2 = wait.wait_on_relay_chain_block(hash2);
// As the block is not yet imported, the first poll should return `Pending`
assert!(poll!(&mut future).is_pending());
assert!(poll!(&mut future2).is_pending());
// Import the block that should fire the notification
client
.import(BlockOrigin::Own, block2)
.expect("Imports the second block");
// The import notification of the second block should not make this one finish
assert!(poll!(&mut future).is_pending());
// Now it should have received the notification and report that the block was imported
assert!(matches!(poll!(future2), Poll::Ready(Ok(()))));
client
.import(BlockOrigin::Own, block)
.expect("Imports the first block");
// Now it should be ready
assert!(matches!(poll!(future), Poll::Ready(Ok(()))));
});
}
}
+33
View File
@@ -0,0 +1,33 @@
[package]
name = "cumulus-client-service"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
[dependencies]
# Cumulus dependencies
cumulus-client-consensus = { path = "../consensus" }
cumulus-client-collator = { path = "../collator" }
cumulus-primitives-core = { path = "../../primitives/core" }
# Substrate dependencies
sc-chain-spec = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-service = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-inherents = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "master" }
# Polkadot dependencies
polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "master" }
polkadot-service = { git = "https://github.com/paritytech/polkadot", branch = "master" }
polkadot-overseer = { git = "https://github.com/paritytech/polkadot", branch = "master" }
# Other deps
futures = "0.3.6"
tracing = "0.1.22"
codec = { package = "parity-scale-codec", version = "2.0.0" }
+50
View File
@@ -0,0 +1,50 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Cumulus.
// Cumulus is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Cumulus is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.
use codec::Encode;
use sc_chain_spec::ChainSpec;
use sp_runtime::traits::{Block as BlockT, Hash as HashT, Header as HeaderT, Zero};
/// Generate the genesis block from a given ChainSpec.
pub fn generate_genesis_block<Block: BlockT>(
chain_spec: &Box<dyn ChainSpec>,
) -> Result<Block, String> {
let storage = chain_spec.build_storage()?;
let child_roots = storage.children_default.iter().map(|(sk, child_content)| {
let state_root = <<<Block as BlockT>::Header as HeaderT>::Hashing as HashT>::trie_root(
child_content.data.clone().into_iter().collect(),
);
(sk.clone(), state_root.encode())
});
let state_root = <<<Block as BlockT>::Header as HeaderT>::Hashing as HashT>::trie_root(
storage.top.clone().into_iter().chain(child_roots).collect(),
);
let extrinsics_root =
<<<Block as BlockT>::Header as HeaderT>::Hashing as HashT>::trie_root(Vec::new());
Ok(Block::new(
<<Block as BlockT>::Header as HeaderT>::new(
Zero::zero(),
extrinsics_root,
state_root,
Default::default(),
Default::default(),
),
Default::default(),
))
}
+347
View File
@@ -0,0 +1,347 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Cumulus.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.
//! Cumulus service
//!
//! Provides functions for starting a collator node or a normal full node.
use cumulus_primitives_core::ParaId;
use futures::{Future, FutureExt};
use polkadot_overseer::OverseerHandler;
use polkadot_primitives::v1::{Block as PBlock, CollatorId, CollatorPair};
use polkadot_service::{AbstractClient, Client as PClient, ClientHandle, RuntimeApiCollection};
use sc_client_api::{
Backend as BackendT, BlockBackend, BlockchainEvents, Finalizer, StateBackend, UsageProvider,
};
use sc_service::{error::Result as ServiceResult, Configuration, Role, TaskManager};
use sp_blockchain::HeaderBackend;
use sp_consensus::{BlockImport, Environment, Error as ConsensusError, Proposer};
use sp_core::traits::SpawnNamed;
use sp_inherents::InherentDataProviders;
use sp_runtime::traits::{BlakeTwo256, Block as BlockT};
use std::{marker::PhantomData, sync::Arc};
pub mod genesis;
/// Polkadot full node handles.
type PFullNode<C> = polkadot_service::NewFull<C>;
/// Parameters given to [`start_collator`].
pub struct StartCollatorParams<
'a,
Block: BlockT,
PF,
BI,
BS,
Client,
Backend,
Spawner,
PClient,
PBackend,
> {
pub proposer_factory: PF,
pub inherent_data_providers: InherentDataProviders,
pub backend: Arc<Backend>,
pub block_import: BI,
pub block_status: Arc<BS>,
pub client: Arc<Client>,
pub announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>,
pub spawner: Spawner,
pub para_id: ParaId,
pub collator_key: CollatorPair,
pub polkadot_full_node: PFullNode<PClient>,
pub task_manager: &'a mut TaskManager,
pub polkadot_backend: Arc<PBackend>,
}
/// Start a collator node for a parachain.
///
/// A collator is similar to a validator in a normal blockchain.
/// It is responsible for producing blocks and sending the blocks to a
/// parachain validator for validation and inclusion into the relay chain.
pub async fn start_collator<'a, Block, PF, BI, BS, Client, Backend, Spawner, PClient, PBackend>(
StartCollatorParams {
proposer_factory,
inherent_data_providers,
backend,
block_import,
block_status,
client,
announce_block,
spawner,
para_id,
collator_key,
polkadot_full_node,
task_manager,
polkadot_backend,
}: StartCollatorParams<'a, Block, PF, BI, BS, Client, Backend, Spawner, PClient, PBackend>,
) -> sc_service::error::Result<()>
where
Block: BlockT,
PF: Environment<Block> + Send + 'static,
BI: BlockImport<
Block,
Error = ConsensusError,
Transaction = <PF::Proposer as Proposer<Block>>::Transaction,
> + Send
+ Sync
+ 'static,
BS: BlockBackend<Block> + Send + Sync + 'static,
Client: Finalizer<Block, Backend>
+ UsageProvider<Block>
+ HeaderBackend<Block>
+ Send
+ Sync
+ BlockBackend<Block>
+ BlockchainEvents<Block>
+ 'static,
for<'b> &'b Client: BlockImport<Block>,
Backend: BackendT<Block> + 'static,
Spawner: SpawnNamed + Clone + Send + Sync + 'static,
PClient: ClientHandle,
PBackend: BackendT<PBlock> + 'static,
PBackend::State: StateBackend<BlakeTwo256>,
{
polkadot_full_node.client.execute_with(StartConsensus {
para_id,
announce_block: announce_block.clone(),
client: client.clone(),
task_manager,
_phantom: PhantomData,
})?;
polkadot_full_node
.client
.execute_with(StartCollator {
proposer_factory,
inherent_data_providers,
backend,
announce_block,
overseer_handler: polkadot_full_node
.overseer_handler
.ok_or_else(|| "Polkadot full node did not provided an `OverseerHandler`!")?,
spawner,
para_id,
collator_key,
block_import,
block_status,
polkadot_backend,
})
.await?;
task_manager.add_child(polkadot_full_node.task_manager);
Ok(())
}
struct StartCollator<Block: BlockT, Backend, PF, BI, BS, Spawner, PBackend> {
proposer_factory: PF,
inherent_data_providers: InherentDataProviders,
backend: Arc<Backend>,
block_import: BI,
block_status: Arc<BS>,
announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>,
overseer_handler: OverseerHandler,
spawner: Spawner,
para_id: ParaId,
collator_key: CollatorPair,
polkadot_backend: Arc<PBackend>,
}
impl<Block, Backend, PF, BI, BS, Spawner, PBackend2> polkadot_service::ExecuteWithClient
for StartCollator<Block, Backend, PF, BI, BS, Spawner, PBackend2>
where
Block: BlockT,
PF: Environment<Block> + Send + 'static,
BI: BlockImport<
Block,
Error = ConsensusError,
Transaction = <PF::Proposer as Proposer<Block>>::Transaction,
> + Send
+ Sync
+ 'static,
BS: BlockBackend<Block> + Send + Sync + 'static,
Backend: BackendT<Block> + 'static,
Spawner: SpawnNamed + Clone + Send + Sync + 'static,
PBackend2: sc_client_api::Backend<PBlock> + 'static,
PBackend2::State: sp_api::StateBackend<BlakeTwo256>,
{
type Output = std::pin::Pin<Box<dyn Future<Output = ServiceResult<()>>>>;
fn execute_with_client<PClient, Api, PBackend>(self, client: Arc<PClient>) -> Self::Output
where
<Api as sp_api::ApiExt<PBlock>>::StateBackend: sp_api::StateBackend<BlakeTwo256>,
PBackend: sc_client_api::Backend<PBlock> + 'static,
PBackend::State: sp_api::StateBackend<BlakeTwo256>,
Api: RuntimeApiCollection<StateBackend = PBackend::State>,
PClient: AbstractClient<PBlock, PBackend, Api = Api> + 'static,
{
async move {
cumulus_client_collator::start_collator(cumulus_client_collator::StartCollatorParams {
proposer_factory: self.proposer_factory,
inherent_data_providers: self.inherent_data_providers,
backend: self.backend,
block_import: self.block_import,
block_status: self.block_status,
announce_block: self.announce_block,
overseer_handler: self.overseer_handler,
spawner: self.spawner,
para_id: self.para_id,
key: self.collator_key,
polkadot_client: client,
polkadot_backend: self.polkadot_backend,
})
.await
.map_err(Into::into)
}
.boxed()
}
}
/// Parameters given to [`start_full_node`].
pub struct StartFullNodeParams<'a, Block: BlockT, Client, PClient> {
pub para_id: ParaId,
pub client: Arc<Client>,
pub polkadot_full_node: PFullNode<PClient>,
pub task_manager: &'a mut TaskManager,
pub announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>,
}
/// Start a full node for a parachain.
///
/// A full node will only sync the given parachain and will follow the
/// tip of the chain.
pub fn start_full_node<Block, Client, Backend, PClient>(
StartFullNodeParams {
client,
announce_block,
task_manager,
polkadot_full_node,
para_id,
}: StartFullNodeParams<Block, Client, PClient>,
) -> sc_service::error::Result<()>
where
Block: BlockT,
Client: Finalizer<Block, Backend>
+ UsageProvider<Block>
+ Send
+ Sync
+ BlockBackend<Block>
+ BlockchainEvents<Block>
+ 'static,
for<'a> &'a Client: BlockImport<Block>,
Backend: BackendT<Block> + 'static,
PClient: ClientHandle,
{
polkadot_full_node.client.execute_with(StartConsensus {
announce_block,
para_id,
client,
task_manager,
_phantom: PhantomData,
})?;
task_manager.add_child(polkadot_full_node.task_manager);
Ok(())
}
struct StartConsensus<'a, Block: BlockT, Client, Backend> {
para_id: ParaId,
announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>,
client: Arc<Client>,
task_manager: &'a mut TaskManager,
_phantom: PhantomData<Backend>,
}
impl<'a, Block, Client, Backend> polkadot_service::ExecuteWithClient
for StartConsensus<'a, Block, Client, Backend>
where
Block: BlockT,
Client: Finalizer<Block, Backend>
+ UsageProvider<Block>
+ Send
+ Sync
+ BlockBackend<Block>
+ BlockchainEvents<Block>
+ 'static,
for<'b> &'b Client: BlockImport<Block>,
Backend: BackendT<Block> + 'static,
{
type Output = ServiceResult<()>;
fn execute_with_client<PClient, Api, PBackend>(self, client: Arc<PClient>) -> Self::Output
where
<Api as sp_api::ApiExt<PBlock>>::StateBackend: sp_api::StateBackend<BlakeTwo256>,
PBackend: sc_client_api::Backend<PBlock>,
PBackend::State: sp_api::StateBackend<BlakeTwo256>,
Api: RuntimeApiCollection<StateBackend = PBackend::State>,
PClient: AbstractClient<PBlock, PBackend, Api = Api> + 'static,
{
let consensus = cumulus_client_consensus::run_parachain_consensus(
self.para_id,
self.client,
client,
self.announce_block,
);
self.task_manager.spawn_essential_handle().spawn(
"cumulus-consensus",
consensus.then(|r| async move {
if let Err(e) = r {
tracing::error!(
target: "cumulus-service",
error = %e,
"Parachain consensus failed.",
)
}
}),
);
Ok(())
}
}
/// Prepare the parachain's node condifugration
///
/// This function will disable the default announcement of Substrate for the parachain in favor
/// of the one of Cumulus.
pub fn prepare_node_config(mut parachain_config: Configuration) -> Configuration {
parachain_config.announce_block = false;
parachain_config
}
/// Build the Polkadot full node using the given `config`.
#[sc_tracing::logging::prefix_logs_with("Relaychain")]
pub fn build_polkadot_full_node(
config: Configuration,
collator_id: CollatorId,
) -> Result<PFullNode<PClient>, polkadot_service::Error> {
let is_light = matches!(config.role, Role::Light);
if is_light {
Err(polkadot_service::Error::Sub(
"Light client not supported.".into(),
))
} else {
polkadot_service::build_full(
config,
polkadot_service::IsCollator::Yes(collator_id),
None,
None,
)
}
}