Introduce interface for relay chain interaction (#835)

This commit is contained in:
Sebastian Kunert
2021-12-22 19:02:11 +01:00
committed by GitHub
parent 7acfd4f80a
commit 876e594f59
30 changed files with 1512 additions and 1607 deletions
+66 -5
View File
@@ -1426,6 +1426,7 @@ dependencies = [
"cumulus-client-consensus-common",
"cumulus-client-network",
"cumulus-primitives-core",
"cumulus-relay-chain-interface",
"cumulus-test-client",
"cumulus-test-runtime",
"futures 0.3.19",
@@ -1455,7 +1456,6 @@ dependencies = [
"cumulus-primitives-core",
"futures 0.3.19",
"parity-scale-codec",
"polkadot-client",
"sc-client-api",
"sc-consensus",
"sc-consensus-aura",
@@ -1480,6 +1480,7 @@ name = "cumulus-client-consensus-common"
version = "0.1.0"
dependencies = [
"async-trait",
"cumulus-relay-chain-interface",
"cumulus-test-client",
"dyn-clone",
"futures 0.3.19",
@@ -1504,9 +1505,9 @@ dependencies = [
"async-trait",
"cumulus-client-consensus-common",
"cumulus-primitives-core",
"cumulus-relay-chain-interface",
"futures 0.3.19",
"parking_lot 0.10.2",
"polkadot-client",
"sc-client-api",
"sc-consensus",
"sp-api",
@@ -1524,17 +1525,21 @@ dependencies = [
name = "cumulus-client-network"
version = "0.1.0"
dependencies = [
"async-trait",
"cumulus-primitives-core",
"cumulus-relay-chain-interface",
"cumulus-relay-chain-local",
"cumulus-test-service",
"derive_more",
"futures 0.3.19",
"futures-timer 3.0.2",
"parity-scale-codec",
"parking_lot 0.10.2",
"parking_lot 0.11.2",
"polkadot-client",
"polkadot-node-primitives",
"polkadot-parachain",
"polkadot-primitives",
"polkadot-service",
"polkadot-test-client",
"sc-cli",
"sc-client-api",
@@ -1546,6 +1551,7 @@ dependencies = [
"sp-keyring",
"sp-keystore",
"sp-runtime",
"sp-state-machine",
"substrate-test-utils",
"tokio",
"tracing",
@@ -1556,6 +1562,7 @@ name = "cumulus-client-pov-recovery"
version = "0.1.0"
dependencies = [
"cumulus-primitives-core",
"cumulus-relay-chain-interface",
"cumulus-test-service",
"futures 0.3.19",
"futures-timer 3.0.2",
@@ -1586,11 +1593,11 @@ dependencies = [
"cumulus-client-consensus-common",
"cumulus-client-pov-recovery",
"cumulus-primitives-core",
"cumulus-relay-chain-interface",
"parity-scale-codec",
"parking_lot 0.10.2",
"polkadot-overseer",
"polkadot-primitives",
"polkadot-service",
"sc-chain-spec",
"sc-client-api",
"sc-consensus",
@@ -1777,9 +1784,9 @@ version = "0.1.0"
dependencies = [
"async-trait",
"cumulus-primitives-core",
"cumulus-relay-chain-interface",
"cumulus-test-relay-sproof-builder",
"parity-scale-codec",
"polkadot-client",
"sc-client-api",
"scale-info",
"sp-api",
@@ -1825,6 +1832,54 @@ dependencies = [
"xcm",
]
[[package]]
name = "cumulus-relay-chain-interface"
version = "0.1.0"
dependencies = [
"async-trait",
"cumulus-primitives-core",
"derive_more",
"parking_lot 0.11.2",
"polkadot-overseer",
"sc-client-api",
"sp-api",
"sp-blockchain",
"sp-core",
"sp-runtime",
"sp-state-machine",
]
[[package]]
name = "cumulus-relay-chain-local"
version = "0.1.0"
dependencies = [
"async-trait",
"cumulus-primitives-core",
"cumulus-relay-chain-interface",
"cumulus-test-service",
"futures 0.3.19",
"futures-timer 3.0.2",
"parking_lot 0.11.2",
"polkadot-client",
"polkadot-primitives",
"polkadot-service",
"polkadot-test-client",
"sc-client-api",
"sc-consensus-babe",
"sc-network",
"sc-service",
"sc-telemetry",
"sc-tracing",
"sp-api",
"sp-blockchain",
"sp-consensus",
"sp-core",
"sp-keyring",
"sp-runtime",
"sp-state-machine",
"tracing",
]
[[package]]
name = "cumulus-test-client"
version = "0.1.0"
@@ -1919,6 +1974,7 @@ dependencies = [
"cumulus-client-service",
"cumulus-primitives-core",
"cumulus-primitives-parachain-inherent",
"cumulus-relay-chain-local",
"cumulus-test-relay-validation-worker-provider",
"cumulus-test-runtime",
"frame-system",
@@ -1927,6 +1983,7 @@ dependencies = [
"jsonrpc-core",
"pallet-transaction-payment",
"parity-scale-codec",
"parking_lot 0.11.2",
"polkadot-primitives",
"polkadot-service",
"polkadot-test-service",
@@ -6026,6 +6083,8 @@ dependencies = [
"cumulus-client-service",
"cumulus-primitives-core",
"cumulus-primitives-parachain-inherent",
"cumulus-relay-chain-interface",
"cumulus-relay-chain-local",
"derive_more",
"frame-benchmarking",
"frame-benchmarking-cli",
@@ -6670,6 +6729,8 @@ dependencies = [
"cumulus-client-service",
"cumulus-primitives-core",
"cumulus-primitives-parachain-inherent",
"cumulus-relay-chain-interface",
"cumulus-relay-chain-local",
"frame-benchmarking",
"frame-benchmarking-cli",
"futures 0.3.19",
+1
View File
@@ -7,6 +7,7 @@ members = [
"client/network",
"client/pov-recovery",
"client/service",
"client/relay-chain-interface",
"pallets/aura-ext",
"pallets/collator-selection",
"pallets/dmp-queue",
+1
View File
@@ -22,6 +22,7 @@ polkadot-node-subsystem = { git = "https://github.com/paritytech/polkadot", bran
cumulus-client-network = { path = "../network" }
cumulus-client-consensus-common = { path = "../consensus/common" }
cumulus-primitives-core = { path = "../../primitives/core" }
cumulus-relay-chain-interface = { path = "../relay-chain-interface" }
# Other dependencies
codec = { package = "parity-scale-codec", version = "2.3.0", features = [ "derive" ] }
+4 -2
View File
@@ -17,7 +17,9 @@
//! Cumulus Collator implementation for Substrate.
use cumulus_client_network::WaitToAnnounce;
use cumulus_primitives_core::{CollectCollationInfo, ParachainBlockData, PersistedValidationData};
use cumulus_primitives_core::{
relay_chain::Hash as PHash, CollectCollationInfo, ParachainBlockData, PersistedValidationData,
};
use sc_client_api::BlockBackend;
use sp_api::ProvideRuntimeApi;
@@ -34,7 +36,7 @@ use polkadot_node_primitives::{
};
use polkadot_node_subsystem::messages::{CollationGenerationMessage, CollatorProtocolMessage};
use polkadot_overseer::Handle as OverseerHandle;
use polkadot_primitives::v1::{CollatorPair, Hash as PHash, HeadData, Id as ParaId};
use polkadot_primitives::v1::{CollatorPair, HeadData, Id as ParaId};
use codec::{Decode, Encode};
use futures::{channel::oneshot, FutureExt};
-3
View File
@@ -24,9 +24,6 @@ sc-telemetry = { git = "https://github.com/paritytech/substrate", branch = "mast
sc-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" }
substrate-prometheus-endpoint = { git = "https://github.com/paritytech/substrate", branch = "master" }
# Polkadot dependencies
polkadot-client = { git = "https://github.com/paritytech/polkadot", branch = "master" }
# Cumulus dependencies
cumulus-client-consensus-common = { path = "../common" }
cumulus-primitives-core = { path = "../../../primitives/core" }
+31 -292
View File
@@ -26,13 +26,10 @@ use codec::{Decode, Encode};
use cumulus_client_consensus_common::{
ParachainBlockImport, ParachainCandidate, ParachainConsensus,
};
use cumulus_primitives_core::{
relay_chain::v1::{Block as PBlock, Hash as PHash, ParachainHost},
PersistedValidationData,
};
use cumulus_primitives_core::{relay_chain::v1::Hash as PHash, PersistedValidationData};
use futures::lock::Mutex;
use polkadot_client::ClientHandle;
use sc_client_api::{backend::AuxStore, Backend, BlockOf};
use sc_client_api::{backend::AuxStore, BlockOf};
use sc_consensus::BlockImport;
use sc_consensus_slots::{BackoffAuthoringBlocksStrategy, SlotInfo};
use sc_telemetry::TelemetryHandle;
@@ -46,8 +43,8 @@ use sp_consensus_aura::AuraApi;
use sp_core::crypto::Pair;
use sp_inherents::{CreateInherentDataProviders, InherentData, InherentDataProvider};
use sp_keystore::SyncCryptoStorePtr;
use sp_runtime::traits::{Block as BlockT, HashFor, Header as HeaderT, Member, NumberFor};
use std::{convert::TryFrom, hash::Hash, marker::PhantomData, sync::Arc};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Member, NumberFor};
use std::{convert::TryFrom, hash::Hash, sync::Arc};
mod import_queue;
@@ -60,10 +57,8 @@ pub use sc_consensus_slots::InherentDataProviderExt;
const LOG_TARGET: &str = "aura::cumulus";
/// The implementation of the AURA consensus for parachains.
pub struct AuraConsensus<B, RClient, RBackend, CIDP> {
pub struct AuraConsensus<B, CIDP> {
create_inherent_data_providers: Arc<CIDP>,
relay_chain_client: Arc<RClient>,
relay_chain_backend: Arc<RBackend>,
aura_worker: Arc<
Mutex<
dyn sc_consensus_slots::SlotWorker<B, <EnableProofRecording as ProofRecording>::Proof>
@@ -74,44 +69,39 @@ pub struct AuraConsensus<B, RClient, RBackend, CIDP> {
slot_duration: SlotDuration,
}
impl<B, RClient, RBackend, CIDP> Clone for AuraConsensus<B, RClient, RBackend, CIDP> {
impl<B, CIDP> Clone for AuraConsensus<B, CIDP> {
fn clone(&self) -> Self {
Self {
create_inherent_data_providers: self.create_inherent_data_providers.clone(),
relay_chain_backend: self.relay_chain_backend.clone(),
relay_chain_client: self.relay_chain_client.clone(),
aura_worker: self.aura_worker.clone(),
slot_duration: self.slot_duration,
}
}
}
impl<B, RClient, RBackend, CIDP> AuraConsensus<B, RClient, RBackend, CIDP>
impl<B, CIDP> AuraConsensus<B, CIDP>
where
B: BlockT,
RClient: ProvideRuntimeApi<PBlock>,
RClient::Api: ParachainHost<PBlock>,
RBackend: Backend<PBlock>,
CIDP: CreateInherentDataProviders<B, (PHash, PersistedValidationData)>,
CIDP: CreateInherentDataProviders<B, (PHash, PersistedValidationData)> + 'static,
CIDP::InherentDataProviders: InherentDataProviderExt,
{
/// Create a new instance of AURA consensus.
pub fn new<P, Client, BI, SO, PF, BS, Error>(
para_client: Arc<Client>,
block_import: BI,
sync_oracle: SO,
proposer_factory: PF,
force_authoring: bool,
backoff_authoring_blocks: Option<BS>,
keystore: SyncCryptoStorePtr,
create_inherent_data_providers: CIDP,
polkadot_client: Arc<RClient>,
polkadot_backend: Arc<RBackend>,
slot_duration: SlotDuration,
telemetry: Option<TelemetryHandle>,
block_proposal_slot_portion: SlotProportion,
max_block_proposal_slot_portion: Option<SlotProportion>,
) -> Self
/// Create a new boxed instance of AURA consensus.
pub fn build<P, Client, BI, SO, PF, BS, Error>(
BuildAuraConsensusParams {
proposer_factory,
create_inherent_data_providers,
block_import,
para_client,
backoff_authoring_blocks,
sync_oracle,
keystore,
force_authoring,
slot_duration,
telemetry,
block_proposal_slot_portion,
max_block_proposal_slot_portion,
}: BuildAuraConsensusParams<PF, BI, CIDP, Client, BS, SO>,
) -> Box<dyn ParachainConsensus<B>>
where
Client:
ProvideRuntimeApi<B> + BlockOf + AuxStore + HeaderBackend<B> + Send + Sync + 'static,
@@ -148,13 +138,11 @@ where
},
);
Self {
Box::new(Self {
create_inherent_data_providers: Arc::new(create_inherent_data_providers),
relay_chain_backend: polkadot_backend,
relay_chain_client: polkadot_client,
aura_worker: Arc::new(Mutex::new(worker)),
slot_duration,
}
})
}
/// Create the inherent data.
@@ -194,13 +182,10 @@ where
}
#[async_trait::async_trait]
impl<B, RClient, RBackend, CIDP> ParachainConsensus<B> for AuraConsensus<B, RClient, RBackend, CIDP>
impl<B, CIDP> ParachainConsensus<B> for AuraConsensus<B, CIDP>
where
B: BlockT,
RClient: ProvideRuntimeApi<PBlock> + Send + Sync,
RClient::Api: ParachainHost<PBlock>,
RBackend: Backend<PBlock>,
CIDP: CreateInherentDataProviders<B, (PHash, PersistedValidationData)> + Send + Sync,
CIDP: CreateInherentDataProviders<B, (PHash, PersistedValidationData)> + Send + Sync + 'static,
CIDP::InherentDataProviders: InherentDataProviderExt + Send,
{
async fn produce_candidate(
@@ -232,12 +217,10 @@ where
}
/// Paramaters of [`build_aura_consensus`].
pub struct BuildAuraConsensusParams<PF, BI, RBackend, CIDP, Client, BS, SO> {
pub struct BuildAuraConsensusParams<PF, BI, CIDP, Client, BS, SO> {
pub proposer_factory: PF,
pub create_inherent_data_providers: CIDP,
pub block_import: BI,
pub relay_chain_client: polkadot_client::Client,
pub relay_chain_backend: Arc<RBackend>,
pub para_client: Arc<Client>,
pub backoff_authoring_blocks: Option<BS>,
pub sync_oracle: SO,
@@ -248,247 +231,3 @@ pub struct BuildAuraConsensusParams<PF, BI, RBackend, CIDP, Client, BS, SO> {
pub block_proposal_slot_portion: SlotProportion,
pub max_block_proposal_slot_portion: Option<SlotProportion>,
}
/// Build the [`AuraConsensus`].
///
/// Returns a boxed [`ParachainConsensus`].
pub fn build_aura_consensus<P, Block, PF, BI, RBackend, CIDP, Client, SO, BS, Error>(
BuildAuraConsensusParams {
proposer_factory,
create_inherent_data_providers,
block_import,
relay_chain_client,
relay_chain_backend,
para_client,
backoff_authoring_blocks,
sync_oracle,
keystore,
force_authoring,
slot_duration,
telemetry,
block_proposal_slot_portion,
max_block_proposal_slot_portion,
}: BuildAuraConsensusParams<PF, BI, RBackend, CIDP, Client, BS, SO>,
) -> Box<dyn ParachainConsensus<Block>>
where
Block: BlockT,
RBackend: Backend<PBlock> + 'static,
CIDP: CreateInherentDataProviders<Block, (PHash, PersistedValidationData)>
+ Send
+ Sync
+ 'static,
CIDP::InherentDataProviders: InherentDataProviderExt + Send,
Client: ProvideRuntimeApi<Block>
+ BlockOf
+ AuxStore
+ HeaderBackend<Block>
+ Send
+ Sync
+ 'static,
Client::Api: AuraApi<Block, P::Public>,
BI: BlockImport<Block, Transaction = sp_api::TransactionFor<Client, Block>>
+ Send
+ Sync
+ 'static,
SO: SyncOracle + Send + Sync + Clone + 'static,
BS: BackoffAuthoringBlocksStrategy<NumberFor<Block>> + Send + Sync + 'static,
PF: Environment<Block, Error = Error> + Send + Sync + 'static,
PF::Proposer: Proposer<
Block,
Error = Error,
Transaction = sp_api::TransactionFor<Client, Block>,
ProofRecording = EnableProofRecording,
Proof = <EnableProofRecording as ProofRecording>::Proof,
>,
Error: std::error::Error + Send + From<sp_consensus::Error> + 'static,
P: Pair + Send + Sync,
P::Public: AppPublic + Hash + Member + Encode + Decode,
P::Signature: TryFrom<Vec<u8>> + Hash + Member + Encode + Decode,
{
AuraConsensusBuilder::<P, _, _, _, _, _, _, _, _, _>::new(
proposer_factory,
block_import,
create_inherent_data_providers,
relay_chain_client,
relay_chain_backend,
para_client,
backoff_authoring_blocks,
sync_oracle,
force_authoring,
keystore,
slot_duration,
telemetry,
block_proposal_slot_portion,
max_block_proposal_slot_portion,
)
.build()
}
/// Aura consensus builder.
///
/// Builds a [`AuraConsensus`] for a parachain. As this requires
/// a concrete relay chain client instance, the builder takes a [`polkadot_client::Client`]
/// that wraps this concrete instance. By using [`polkadot_client::ExecuteWithClient`]
/// the builder gets access to this concrete instance.
struct AuraConsensusBuilder<P, Block, PF, BI, RBackend, CIDP, Client, SO, BS, Error> {
_phantom: PhantomData<(Block, Error, P)>,
proposer_factory: PF,
create_inherent_data_providers: CIDP,
block_import: BI,
relay_chain_backend: Arc<RBackend>,
relay_chain_client: polkadot_client::Client,
para_client: Arc<Client>,
backoff_authoring_blocks: Option<BS>,
sync_oracle: SO,
force_authoring: bool,
keystore: SyncCryptoStorePtr,
slot_duration: SlotDuration,
telemetry: Option<TelemetryHandle>,
block_proposal_slot_portion: SlotProportion,
max_block_proposal_slot_portion: Option<SlotProportion>,
}
impl<Block, PF, BI, RBackend, CIDP, Client, SO, BS, P, Error>
AuraConsensusBuilder<P, Block, PF, BI, RBackend, CIDP, Client, SO, BS, Error>
where
Block: BlockT,
RBackend: Backend<PBlock> + 'static,
CIDP: CreateInherentDataProviders<Block, (PHash, PersistedValidationData)>
+ Send
+ Sync
+ 'static,
CIDP::InherentDataProviders: InherentDataProviderExt + Send,
Client: ProvideRuntimeApi<Block>
+ BlockOf
+ AuxStore
+ HeaderBackend<Block>
+ Send
+ Sync
+ 'static,
Client::Api: AuraApi<Block, P::Public>,
BI: BlockImport<Block, Transaction = sp_api::TransactionFor<Client, Block>>
+ Send
+ Sync
+ 'static,
SO: SyncOracle + Send + Sync + Clone + 'static,
BS: BackoffAuthoringBlocksStrategy<NumberFor<Block>> + Send + Sync + 'static,
PF: Environment<Block, Error = Error> + Send + Sync + 'static,
PF::Proposer: Proposer<
Block,
Error = Error,
Transaction = sp_api::TransactionFor<Client, Block>,
ProofRecording = EnableProofRecording,
Proof = <EnableProofRecording as ProofRecording>::Proof,
>,
Error: std::error::Error + Send + From<sp_consensus::Error> + 'static,
P: Pair + Send + Sync,
P::Public: AppPublic + Hash + Member + Encode + Decode,
P::Signature: TryFrom<Vec<u8>> + Hash + Member + Encode + Decode,
{
/// Create a new instance of the builder.
fn new(
proposer_factory: PF,
block_import: BI,
create_inherent_data_providers: CIDP,
relay_chain_client: polkadot_client::Client,
relay_chain_backend: Arc<RBackend>,
para_client: Arc<Client>,
backoff_authoring_blocks: Option<BS>,
sync_oracle: SO,
force_authoring: bool,
keystore: SyncCryptoStorePtr,
slot_duration: SlotDuration,
telemetry: Option<TelemetryHandle>,
block_proposal_slot_portion: SlotProportion,
max_block_proposal_slot_portion: Option<SlotProportion>,
) -> Self {
Self {
_phantom: PhantomData,
proposer_factory,
block_import,
create_inherent_data_providers,
relay_chain_backend,
relay_chain_client,
para_client,
backoff_authoring_blocks,
sync_oracle,
force_authoring,
keystore,
slot_duration,
telemetry,
block_proposal_slot_portion,
max_block_proposal_slot_portion,
}
}
/// Build the relay chain consensus.
fn build(self) -> Box<dyn ParachainConsensus<Block>> {
self.relay_chain_client.clone().execute_with(self)
}
}
impl<Block, PF, BI, RBackend, CIDP, Client, SO, BS, P, Error> polkadot_client::ExecuteWithClient
for AuraConsensusBuilder<P, Block, PF, BI, RBackend, CIDP, Client, SO, BS, Error>
where
Block: BlockT,
RBackend: Backend<PBlock> + 'static,
CIDP: CreateInherentDataProviders<Block, (PHash, PersistedValidationData)>
+ Send
+ Sync
+ 'static,
CIDP::InherentDataProviders: InherentDataProviderExt + Send,
Client: ProvideRuntimeApi<Block>
+ BlockOf
+ AuxStore
+ HeaderBackend<Block>
+ Send
+ Sync
+ 'static,
Client::Api: AuraApi<Block, P::Public>,
BI: BlockImport<Block, Transaction = sp_api::TransactionFor<Client, Block>>
+ Send
+ Sync
+ 'static,
SO: SyncOracle + Send + Sync + Clone + 'static,
BS: BackoffAuthoringBlocksStrategy<NumberFor<Block>> + Send + Sync + 'static,
PF: Environment<Block, Error = Error> + Send + Sync + 'static,
PF::Proposer: Proposer<
Block,
Error = Error,
Transaction = sp_api::TransactionFor<Client, Block>,
ProofRecording = EnableProofRecording,
Proof = <EnableProofRecording as ProofRecording>::Proof,
>,
Error: std::error::Error + Send + From<sp_consensus::Error> + 'static,
P: Pair + Send + Sync,
P::Public: AppPublic + Hash + Member + Encode + Decode,
P::Signature: TryFrom<Vec<u8>> + Hash + Member + Encode + Decode,
{
type Output = Box<dyn ParachainConsensus<Block>>;
fn execute_with_client<PClient, Api, PBackend>(self, client: Arc<PClient>) -> Self::Output
where
<Api as sp_api::ApiExt<PBlock>>::StateBackend: sp_api::StateBackend<HashFor<PBlock>>,
PBackend: Backend<PBlock>,
PBackend::State: sp_api::StateBackend<sp_runtime::traits::BlakeTwo256>,
Api: polkadot_client::RuntimeApiCollection<StateBackend = PBackend::State>,
PClient: polkadot_client::AbstractClient<PBlock, PBackend, Api = Api> + 'static,
{
Box::new(AuraConsensus::new::<P, _, _, _, _, _, _>(
self.para_client,
self.block_import,
self.sync_oracle,
self.proposer_factory,
self.force_authoring,
self.backoff_authoring_blocks,
self.keystore,
self.create_inherent_data_providers,
client.clone(),
self.relay_chain_backend,
self.slot_duration,
self.telemetry,
self.block_proposal_slot_portion,
self.max_block_proposal_slot_portion,
))
}
}
@@ -18,6 +18,9 @@ sp-trie = { git = "https://github.com/paritytech/substrate", branch = "master" }
# Polkadot deps
polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "master" }
# Cumulus deps
cumulus-relay-chain-interface = { path = "../../relay-chain-interface" }
# Other deps
futures = { version = "0.3.8", features = ["compat"] }
codec = { package = "parity-scale-codec", version = "2.3.0", features = [ "derive" ] }
@@ -14,11 +14,11 @@
// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.
use cumulus_relay_chain_interface::RelayChainInterface;
use sc_client_api::{
Backend, BlockBackend, BlockImportNotification, BlockchainEvents, Finalizer, UsageProvider,
};
use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy};
use sp_api::ProvideRuntimeApi;
use sp_blockchain::{Error as ClientError, Result as ClientResult};
use sp_consensus::{BlockOrigin, BlockStatus};
use sp_runtime::{
@@ -26,9 +26,7 @@ use sp_runtime::{
traits::{Block as BlockT, Header as HeaderT},
};
use polkadot_primitives::v1::{
Block as PBlock, Id as ParaId, OccupiedCoreAssumption, ParachainHost,
};
use polkadot_primitives::v1::{Block as PBlock, Id as ParaId, OccupiedCoreAssumption};
use codec::Decode;
use futures::{future, select, FutureExt, Stream, StreamExt};
@@ -370,10 +368,9 @@ where
}
}
impl<T> RelaychainClient for Arc<T>
impl<RCInterface> RelaychainClient for RCInterface
where
T: sc_client_api::BlockchainEvents<PBlock> + ProvideRuntimeApi<PBlock> + 'static + Send + Sync,
<T as ProvideRuntimeApi<PBlock>>::Api: ParachainHost<PBlock>,
RCInterface: RelayChainInterface + Clone + 'static,
{
type Error = ClientError;
@@ -410,8 +407,7 @@ where
at: &BlockId<PBlock>,
para_id: ParaId,
) -> ClientResult<Option<Vec<u8>>> {
self.runtime_api()
.persisted_validation_data(at, para_id, OccupiedCoreAssumption::TimedOut)
self.persisted_validation_data(at, para_id, OccupiedCoreAssumption::TimedOut)
.map(|s| s.map(|s| s.parent_head.0))
.map_err(Into::into)
}
@@ -19,11 +19,11 @@ sc-consensus = { git = "https://github.com/paritytech/substrate", branch = "mast
substrate-prometheus-endpoint = { git = "https://github.com/paritytech/substrate", branch = "master" }
# Polkadot dependencies
polkadot-client = { git = "https://github.com/paritytech/polkadot", branch = "master" }
# Cumulus dependencies
cumulus-client-consensus-common = { path = "../common" }
cumulus-primitives-core = { path = "../../../primitives/core" }
cumulus-relay-chain-interface = { path = "../../relay-chain-interface" }
# Other deps
futures = { version = "0.3.8", features = ["compat"] }
+27 -134
View File
@@ -36,20 +36,16 @@
use cumulus_client_consensus_common::{
ParachainBlockImport, ParachainCandidate, ParachainConsensus,
};
use cumulus_primitives_core::{
relay_chain::v1::{Block as PBlock, Hash as PHash, ParachainHost},
ParaId, PersistedValidationData,
};
use cumulus_primitives_core::{relay_chain::v1::Hash as PHash, ParaId, PersistedValidationData};
use cumulus_relay_chain_interface::RelayChainInterface;
use parking_lot::Mutex;
use polkadot_client::ClientHandle;
use sc_client_api::Backend;
use sc_consensus::{BlockImport, BlockImportParams};
use sp_api::ProvideRuntimeApi;
use sp_consensus::{
BlockOrigin, EnableProofRecording, Environment, ProofRecording, Proposal, Proposer,
};
use sp_inherents::{CreateInherentDataProviders, InherentData, InherentDataProvider};
use sp_runtime::traits::{Block as BlockT, HashFor, Header as HeaderT};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
use std::{marker::PhantomData, sync::Arc, time::Duration};
mod import_queue;
@@ -58,18 +54,18 @@ pub use import_queue::{import_queue, Verifier};
const LOG_TARGET: &str = "cumulus-consensus-relay-chain";
/// The implementation of the relay-chain provided consensus for parachains.
pub struct RelayChainConsensus<B, PF, BI, RClient, RBackend, CIDP> {
pub struct RelayChainConsensus<B, PF, BI, RCInterface, CIDP> {
para_id: ParaId,
_phantom: PhantomData<B>,
proposer_factory: Arc<Mutex<PF>>,
create_inherent_data_providers: Arc<CIDP>,
block_import: Arc<futures::lock::Mutex<ParachainBlockImport<BI>>>,
relay_chain_client: Arc<RClient>,
relay_chain_backend: Arc<RBackend>,
relay_chain_interface: RCInterface,
}
impl<B, PF, BI, RClient, RBackend, CIDP> Clone
for RelayChainConsensus<B, PF, BI, RClient, RBackend, CIDP>
impl<B, PF, BI, RCInterface, CIDP> Clone for RelayChainConsensus<B, PF, BI, RCInterface, CIDP>
where
RCInterface: Clone,
{
fn clone(&self) -> Self {
Self {
@@ -78,18 +74,15 @@ impl<B, PF, BI, RClient, RBackend, CIDP> Clone
proposer_factory: self.proposer_factory.clone(),
create_inherent_data_providers: self.create_inherent_data_providers.clone(),
block_import: self.block_import.clone(),
relay_chain_backend: self.relay_chain_backend.clone(),
relay_chain_client: self.relay_chain_client.clone(),
relay_chain_interface: self.relay_chain_interface.clone(),
}
}
}
impl<B, PF, BI, RClient, RBackend, CIDP> RelayChainConsensus<B, PF, BI, RClient, RBackend, CIDP>
impl<B, PF, BI, RCInterface, CIDP> RelayChainConsensus<B, PF, BI, RCInterface, CIDP>
where
B: BlockT,
RClient: ProvideRuntimeApi<PBlock>,
RClient::Api: ParachainHost<PBlock>,
RBackend: Backend<PBlock>,
RCInterface: RelayChainInterface,
CIDP: CreateInherentDataProviders<B, (PHash, PersistedValidationData)>,
{
/// Create a new instance of relay-chain provided consensus.
@@ -98,8 +91,7 @@ where
proposer_factory: PF,
create_inherent_data_providers: CIDP,
block_import: BI,
polkadot_client: Arc<RClient>,
polkadot_backend: Arc<RBackend>,
relay_chain_interface: RCInterface,
) -> Self {
Self {
para_id,
@@ -108,8 +100,7 @@ where
block_import: Arc::new(futures::lock::Mutex::new(ParachainBlockImport::new(
block_import,
))),
relay_chain_backend: polkadot_backend,
relay_chain_client: polkadot_client,
relay_chain_interface,
_phantom: PhantomData,
}
}
@@ -148,13 +139,11 @@ where
}
#[async_trait::async_trait]
impl<B, PF, BI, RClient, RBackend, CIDP> ParachainConsensus<B>
for RelayChainConsensus<B, PF, BI, RClient, RBackend, CIDP>
impl<B, PF, BI, RCInterface, CIDP> ParachainConsensus<B>
for RelayChainConsensus<B, PF, BI, RCInterface, CIDP>
where
B: BlockT,
RClient: ProvideRuntimeApi<PBlock> + Send + Sync,
RClient::Api: ParachainHost<PBlock>,
RBackend: Backend<PBlock>,
RCInterface: RelayChainInterface + Clone,
BI: BlockImport<B> + Send + Sync,
PF: Environment<B> + Send + Sync,
PF::Proposer: Proposer<
@@ -229,27 +218,25 @@ where
}
/// Paramaters of [`build_relay_chain_consensus`].
pub struct BuildRelayChainConsensusParams<PF, BI, RBackend, CIDP> {
pub struct BuildRelayChainConsensusParams<PF, BI, CIDP, RCInterface> {
pub para_id: ParaId,
pub proposer_factory: PF,
pub create_inherent_data_providers: CIDP,
pub block_import: BI,
pub relay_chain_client: polkadot_client::Client,
pub relay_chain_backend: Arc<RBackend>,
pub relay_chain_interface: RCInterface,
}
/// Build the [`RelayChainConsensus`].
///
/// Returns a boxed [`ParachainConsensus`].
pub fn build_relay_chain_consensus<Block, PF, BI, RBackend, CIDP>(
pub fn build_relay_chain_consensus<Block, PF, BI, CIDP, RCInterface>(
BuildRelayChainConsensusParams {
para_id,
proposer_factory,
create_inherent_data_providers,
block_import,
relay_chain_client,
relay_chain_backend,
}: BuildRelayChainConsensusParams<PF, BI, RBackend, CIDP>,
relay_chain_interface,
}: BuildRelayChainConsensusParams<PF, BI, CIDP, RCInterface>,
) -> Box<dyn ParachainConsensus<Block>>
where
Block: BlockT,
@@ -261,108 +248,14 @@ where
Proof = <EnableProofRecording as ProofRecording>::Proof,
>,
BI: BlockImport<Block> + Send + Sync + 'static,
RBackend: Backend<PBlock> + 'static,
CIDP: CreateInherentDataProviders<Block, (PHash, PersistedValidationData)> + 'static,
RCInterface: RelayChainInterface + Clone + 'static,
{
RelayChainConsensusBuilder::new(
Box::new(RelayChainConsensus::new(
para_id,
proposer_factory,
block_import,
create_inherent_data_providers,
relay_chain_client,
relay_chain_backend,
)
.build()
}
/// Relay chain consensus builder.
///
/// Builds a [`RelayChainConsensus`] for a parachain. As this requires
/// a concrete relay chain client instance, the builder takes a [`polkadot_client::Client`]
/// that wraps this concrete instanace. By using [`polkadot_client::ExecuteWithClient`]
/// the builder gets access to this concrete instance.
struct RelayChainConsensusBuilder<Block, PF, BI, RBackend, CIDP> {
para_id: ParaId,
_phantom: PhantomData<Block>,
proposer_factory: PF,
create_inherent_data_providers: CIDP,
block_import: BI,
relay_chain_backend: Arc<RBackend>,
relay_chain_client: polkadot_client::Client,
}
impl<Block, PF, BI, RBackend, CIDP> RelayChainConsensusBuilder<Block, PF, BI, RBackend, CIDP>
where
Block: BlockT,
PF: Environment<Block> + Send + Sync + 'static,
PF::Proposer: Proposer<
Block,
Transaction = BI::Transaction,
ProofRecording = EnableProofRecording,
Proof = <EnableProofRecording as ProofRecording>::Proof,
>,
BI: BlockImport<Block> + Send + Sync + 'static,
RBackend: Backend<PBlock> + 'static,
CIDP: CreateInherentDataProviders<Block, (PHash, PersistedValidationData)> + 'static,
{
/// Create a new instance of the builder.
fn new(
para_id: ParaId,
proposer_factory: PF,
block_import: BI,
create_inherent_data_providers: CIDP,
relay_chain_client: polkadot_client::Client,
relay_chain_backend: Arc<RBackend>,
) -> Self {
Self {
para_id,
_phantom: PhantomData,
proposer_factory,
block_import,
create_inherent_data_providers,
relay_chain_backend,
relay_chain_client,
}
}
/// Build the relay chain consensus.
fn build(self) -> Box<dyn ParachainConsensus<Block>> {
self.relay_chain_client.clone().execute_with(self)
}
}
impl<Block, PF, BI, RBackend, CIDP> polkadot_client::ExecuteWithClient
for RelayChainConsensusBuilder<Block, PF, BI, RBackend, CIDP>
where
Block: BlockT,
PF: Environment<Block> + Send + Sync + 'static,
PF::Proposer: Proposer<
Block,
Transaction = BI::Transaction,
ProofRecording = EnableProofRecording,
Proof = <EnableProofRecording as ProofRecording>::Proof,
>,
BI: BlockImport<Block> + Send + Sync + 'static,
RBackend: Backend<PBlock> + 'static,
CIDP: CreateInherentDataProviders<Block, (PHash, PersistedValidationData)> + 'static,
{
type Output = Box<dyn ParachainConsensus<Block>>;
fn execute_with_client<PClient, Api, PBackend>(self, client: Arc<PClient>) -> Self::Output
where
<Api as sp_api::ApiExt<PBlock>>::StateBackend: sp_api::StateBackend<HashFor<PBlock>>,
PBackend: Backend<PBlock>,
PBackend::State: sp_api::StateBackend<sp_runtime::traits::BlakeTwo256>,
Api: polkadot_client::RuntimeApiCollection<StateBackend = PBackend::State>,
PClient: polkadot_client::AbstractClient<PBlock, PBackend, Api = Api> + 'static,
{
Box::new(RelayChainConsensus::new(
self.para_id,
self.proposer_factory,
self.create_inherent_data_providers,
self.block_import,
client.clone(),
self.relay_chain_backend,
))
}
block_import,
relay_chain_interface,
))
}
+8 -2
View File
@@ -13,20 +13,23 @@ sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master
sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-state-machine = { git = "https://github.com/paritytech/substrate", branch = "master" }
# Polkadot deps
polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "master" }
polkadot-node-primitives = { git = "https://github.com/paritytech/polkadot", branch = "master" }
polkadot-client = { git = "https://github.com/paritytech/polkadot", branch = "master" }
polkadot-parachain = { git = "https://github.com/paritytech/polkadot", branch = "master" }
cumulus-relay-chain-interface = { path = "../relay-chain-interface" }
# other deps
codec = { package = "parity-scale-codec", version = "2.3.0", features = [ "derive" ] }
futures = { version = "0.3.1", features = ["compat"] }
futures-timer = "3.0.2"
tracing = "0.1.22"
parking_lot = "0.10.2"
parking_lot = "0.11.1"
derive_more = "0.99.2"
async-trait = "0.1.52"
[dev-dependencies]
tokio = { version = "1.10", features = ["macros"] }
@@ -34,9 +37,12 @@ tokio = { version = "1.10", features = ["macros"] }
# Cumulus deps
cumulus-test-service = { path = "../../test/service" }
cumulus-primitives-core = { path = "../../primitives/core" }
cumulus-relay-chain-local = { path = "../relay-chain-local" }
# Polkadot deps
polkadot-test-client = { git = "https://github.com/paritytech/polkadot", branch = "master" }
polkadot-client = { git = "https://github.com/paritytech/polkadot", branch = "master" }
polkadot-service = { git = "https://github.com/paritytech/polkadot", branch = "master" }
# substrate deps
sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
+37 -153
View File
@@ -20,12 +20,8 @@
//! that use the relay chain provided consensus. See [`BlockAnnounceValidator`]
//! and [`WaitToAnnounce`] for more information about this implementation.
use sc_client_api::{Backend, BlockchainEvents};
use sp_api::ProvideRuntimeApi;
use sp_blockchain::HeaderBackend;
use sp_consensus::{
block_validation::{BlockAnnounceValidator as BlockAnnounceValidatorT, Validation},
SyncOracle,
use sp_consensus::block_validation::{
BlockAnnounceValidator as BlockAnnounceValidatorT, Validation,
};
use sp_core::traits::SpawnNamed;
use sp_runtime::{
@@ -33,12 +29,12 @@ use sp_runtime::{
traits::{Block as BlockT, Header as HeaderT},
};
use polkadot_client::ClientHandle;
use cumulus_relay_chain_interface::RelayChainInterface;
use polkadot_node_primitives::{CollationSecondedSignal, Statement};
use polkadot_parachain::primitives::HeadData;
use polkadot_primitives::v1::{
Block as PBlock, CandidateReceipt, CompactStatement, Hash as PHash, Id as ParaId,
OccupiedCoreAssumption, ParachainHost, SigningContext, UncheckedSigned,
OccupiedCoreAssumption, SigningContext, UncheckedSigned,
};
use codec::{Decode, DecodeAll, Encode};
@@ -50,11 +46,8 @@ use futures::{
use std::{convert::TryFrom, fmt, marker::PhantomData, pin::Pin, sync::Arc};
use wait_on_relay_chain_block::WaitOnRelayChainBlock;
#[cfg(test)]
mod tests;
mod wait_on_relay_chain_block;
const LOG_TARGET: &str = "sync::cumulus";
@@ -135,19 +128,18 @@ impl BlockAnnounceData {
/// Check the signature of the statement.
///
/// Returns an `Err(_)` if it failed.
fn check_signature<P>(
fn check_signature<RCInterface>(
self,
relay_chain_client: &Arc<P>,
relay_chain_client: &RCInterface,
) -> Result<Validation, BlockAnnounceError>
where
P: ProvideRuntimeApi<PBlock> + Send + Sync + 'static,
P::Api: ParachainHost<PBlock>,
RCInterface: RelayChainInterface + 'static,
{
let runtime_api = relay_chain_client.runtime_api();
let validator_index = self.statement.unchecked_validator_index();
let runtime_api_block_id = BlockId::Hash(self.relay_parent);
let session_index = match runtime_api.session_index_for_child(&runtime_api_block_id) {
let session_index = match relay_chain_client.session_index_for_child(&runtime_api_block_id)
{
Ok(r) => r,
Err(e) => return Err(BlockAnnounceError(format!("{:?}", e))),
};
@@ -155,7 +147,7 @@ impl BlockAnnounceData {
let signing_context = SigningContext { parent_hash: self.relay_parent, session_index };
// Check that the signer is a legit validator.
let authorities = match runtime_api.validators(&runtime_api_block_id) {
let authorities = match relay_chain_client.validators(&runtime_api_block_id) {
Ok(r) => r,
Err(e) => return Err(BlockAnnounceError(format!("{:?}", e))),
};
@@ -230,52 +222,37 @@ impl TryFrom<&'_ CollationSecondedSignal> for BlockAnnounceData {
/// chain. If it is at the tip, it is required to provide a justification or otherwise we reject
/// it. However, if the announcement is for a block below the tip the announcement is accepted
/// as it probably comes from a node that is currently syncing the chain.
pub struct BlockAnnounceValidator<Block, R, B, BCE> {
pub struct BlockAnnounceValidator<Block, RCInterface> {
phantom: PhantomData<Block>,
relay_chain_client: Arc<R>,
relay_chain_backend: Arc<B>,
relay_chain_interface: RCInterface,
para_id: ParaId,
relay_chain_sync_oracle: Box<dyn SyncOracle + Send>,
wait_on_relay_chain_block: WaitOnRelayChainBlock<B, BCE>,
}
impl<Block, R, B, BCE> BlockAnnounceValidator<Block, R, B, BCE> {
impl<Block, RCInterface> BlockAnnounceValidator<Block, RCInterface>
where
RCInterface: Clone,
{
/// Create a new [`BlockAnnounceValidator`].
pub fn new(
relay_chain_client: Arc<R>,
para_id: ParaId,
relay_chain_sync_oracle: Box<dyn SyncOracle + Send>,
relay_chain_backend: Arc<B>,
relay_chain_blockchain_events: Arc<BCE>,
) -> Self {
pub fn new(relay_chain_interface: RCInterface, para_id: ParaId) -> Self {
Self {
phantom: Default::default(),
relay_chain_client,
relay_chain_interface: relay_chain_interface.clone(),
para_id,
relay_chain_sync_oracle,
relay_chain_backend: relay_chain_backend.clone(),
wait_on_relay_chain_block: WaitOnRelayChainBlock::new(
relay_chain_backend,
relay_chain_blockchain_events,
),
}
}
}
impl<Block: BlockT, R, B, BCE> BlockAnnounceValidator<Block, R, B, BCE>
impl<Block: BlockT, RCInterface> BlockAnnounceValidator<Block, RCInterface>
where
R: ProvideRuntimeApi<PBlock> + Send + Sync + 'static,
R::Api: ParachainHost<PBlock>,
B: Backend<PBlock> + 'static,
RCInterface: RelayChainInterface + Clone,
{
/// Get the included block of the given parachain in the relay chain.
fn included_block(
relay_chain_client: &R,
relay_chain_interface: &RCInterface,
block_id: &BlockId<PBlock>,
para_id: ParaId,
) -> Result<Block::Header, BoxedError> {
let validation_data = relay_chain_client
.runtime_api()
let validation_data = relay_chain_interface
.persisted_validation_data(block_id, para_id, OccupiedCoreAssumption::TimedOut)
.map_err(|e| Box::new(BlockAnnounceError(format!("{:?}", e))) as Box<_>)?
.ok_or_else(|| {
@@ -293,12 +270,11 @@ where
/// Get the backed block hash of the given parachain in the relay chain.
fn backed_block_hash(
relay_chain_client: &R,
relay_chain_interface: &RCInterface,
block_id: &BlockId<PBlock>,
para_id: ParaId,
) -> Result<Option<PHash>, BoxedError> {
let candidate_receipt = relay_chain_client
.runtime_api()
let candidate_receipt = relay_chain_interface
.candidate_pending_availability(block_id, para_id)
.map_err(|e| Box::new(BlockAnnounceError(format!("{:?}", e))) as Box<_>)?;
@@ -310,21 +286,20 @@ where
&self,
header: Block::Header,
) -> impl Future<Output = Result<Validation, BoxedError>> {
let relay_chain_client = self.relay_chain_client.clone();
let relay_chain_backend = self.relay_chain_backend.clone();
let relay_chain_interface = self.relay_chain_interface.clone();
let para_id = self.para_id;
async move {
// Check if block is equal or higher than best (this requires a justification)
let relay_chain_info = relay_chain_backend.blockchain().info();
let runtime_api_block_id = BlockId::Hash(relay_chain_info.best_hash);
let relay_chain_best_hash = relay_chain_interface.best_block_hash();
let runtime_api_block_id = BlockId::Hash(relay_chain_best_hash);
let block_number = header.number();
let best_head =
Self::included_block(&*relay_chain_client, &runtime_api_block_id, para_id)?;
Self::included_block(&relay_chain_interface, &runtime_api_block_id, para_id)?;
let known_best_number = best_head.number();
let backed_block =
|| Self::backed_block_hash(&*relay_chain_client, &runtime_api_block_id, para_id);
|| Self::backed_block_hash(&relay_chain_interface, &runtime_api_block_id, para_id);
if best_head == header {
tracing::debug!(target: LOG_TARGET, "Announced block matches best block.",);
@@ -348,20 +323,17 @@ where
}
}
impl<Block: BlockT, P, B, BCE> BlockAnnounceValidatorT<Block>
for BlockAnnounceValidator<Block, P, B, BCE>
impl<Block: BlockT, RCInterface> BlockAnnounceValidatorT<Block>
for BlockAnnounceValidator<Block, RCInterface>
where
P: ProvideRuntimeApi<PBlock> + Send + Sync + 'static,
P::Api: ParachainHost<PBlock>,
B: Backend<PBlock> + 'static,
BCE: BlockchainEvents<PBlock> + 'static + Send + Sync,
RCInterface: RelayChainInterface + Clone + 'static,
{
fn validate(
&mut self,
header: &Block::Header,
mut data: &[u8],
) -> Pin<Box<dyn Future<Output = Result<Validation, BoxedError>> + Send>> {
if self.relay_chain_sync_oracle.is_major_syncing() {
if self.relay_chain_interface.is_major_syncing() {
return ready(Ok(Validation::Success { is_new_best: false })).boxed()
}
@@ -381,9 +353,8 @@ where
.boxed(),
};
let relay_chain_client = self.relay_chain_client.clone();
let relay_chain_interface = self.relay_chain_interface.clone();
let header_encoded = header.encode();
let wait_on_relay_chain_block = self.wait_on_relay_chain_block.clone();
async move {
if let Err(e) = block_announce_data.validate(header_encoded) {
@@ -392,106 +363,19 @@ where
let relay_parent = block_announce_data.receipt.descriptor.relay_parent;
wait_on_relay_chain_block
.wait_on_relay_chain_block(relay_parent)
relay_chain_interface
.wait_for_block(relay_parent)
.await
.map_err(|e| Box::new(BlockAnnounceError(e.to_string())) as Box<_>)?;
block_announce_data
.check_signature(&relay_chain_client)
.check_signature(&relay_chain_interface)
.map_err(|e| Box::new(e) as Box<_>)
}
.boxed()
}
}
/// Build a block announce validator instance.
///
/// Returns a boxed [`BlockAnnounceValidator`].
pub fn build_block_announce_validator<Block: BlockT, B>(
relay_chain_client: polkadot_client::Client,
para_id: ParaId,
relay_chain_sync_oracle: Box<dyn SyncOracle + Send>,
relay_chain_backend: Arc<B>,
) -> Box<dyn BlockAnnounceValidatorT<Block> + Send>
where
B: Backend<PBlock> + Send + 'static,
{
BlockAnnounceValidatorBuilder::new(
relay_chain_client,
para_id,
relay_chain_sync_oracle,
relay_chain_backend,
)
.build()
}
/// Block announce validator builder.
///
/// Builds a [`BlockAnnounceValidator`] for a parachain. As this requires
/// a concrete relay chain client instance, the builder takes a [`polkadot_client::Client`]
/// that wraps this concrete instanace. By using [`polkadot_client::ExecuteWithClient`]
/// the builder gets access to this concrete instance.
struct BlockAnnounceValidatorBuilder<Block, B> {
phantom: PhantomData<Block>,
relay_chain_client: polkadot_client::Client,
para_id: ParaId,
relay_chain_sync_oracle: Box<dyn SyncOracle + Send>,
relay_chain_backend: Arc<B>,
}
impl<Block: BlockT, B> BlockAnnounceValidatorBuilder<Block, B>
where
B: Backend<PBlock> + Send + 'static,
{
/// Create a new instance of the builder.
fn new(
relay_chain_client: polkadot_client::Client,
para_id: ParaId,
relay_chain_sync_oracle: Box<dyn SyncOracle + Send>,
relay_chain_backend: Arc<B>,
) -> Self {
Self {
relay_chain_client,
para_id,
relay_chain_sync_oracle,
relay_chain_backend,
phantom: PhantomData,
}
}
/// Build the block announce validator.
fn build(self) -> Box<dyn BlockAnnounceValidatorT<Block> + Send> {
self.relay_chain_client.clone().execute_with(self)
}
}
impl<Block: BlockT, B> polkadot_client::ExecuteWithClient
for BlockAnnounceValidatorBuilder<Block, B>
where
B: Backend<PBlock> + Send + 'static,
{
type Output = Box<dyn BlockAnnounceValidatorT<Block> + Send>;
fn execute_with_client<PClient, Api, PBackend>(self, client: Arc<PClient>) -> Self::Output
where
<Api as sp_api::ApiExt<PBlock>>::StateBackend:
sp_api::StateBackend<sp_runtime::traits::BlakeTwo256>,
PBackend: Backend<PBlock>,
PBackend::State: sp_api::StateBackend<sp_runtime::traits::BlakeTwo256>,
Api: polkadot_client::RuntimeApiCollection<StateBackend = PBackend::State>,
PClient: polkadot_client::AbstractClient<PBlock, PBackend, Api = Api> + 'static,
{
Box::new(BlockAnnounceValidator::new(
client.clone(),
self.para_id,
self.relay_chain_sync_oracle,
self.relay_chain_backend,
client,
))
}
}
/// Wait before announcing a block that a candidate message has been received for this block, then
/// add this message as justification for the block announcement.
///
+193 -174
View File
@@ -15,30 +15,33 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use super::*;
use async_trait::async_trait;
use cumulus_relay_chain_interface::WaitError;
use cumulus_relay_chain_local::{check_block_in_chain, BlockCheckStatus};
use cumulus_test_service::runtime::{Block, Hash, Header};
use futures::{executor::block_on, poll, task::Poll};
use futures::{executor::block_on, poll, task::Poll, FutureExt, StreamExt};
use parking_lot::Mutex;
use polkadot_node_primitives::{SignedFullStatement, Statement};
use polkadot_primitives::v1::{
Block as PBlock, BlockNumber, CandidateCommitments, CandidateDescriptor, CandidateEvent,
CollatorPair, CommittedCandidateReceipt, CoreState, GroupRotationInfo, Hash as PHash, HeadData,
Id as ParaId, InboundDownwardMessage, InboundHrmpMessage, OccupiedCoreAssumption,
ParachainHost, PersistedValidationData, PvfCheckStatement, ScrapedOnChainVotes, SessionIndex,
SessionInfo, SigningContext, ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex,
ValidatorSignature,
Block as PBlock, CandidateCommitments, CandidateDescriptor, CollatorPair,
CommittedCandidateReceipt, Hash as PHash, HeadData, Id as ParaId, InboundDownwardMessage,
InboundHrmpMessage, OccupiedCoreAssumption, PersistedValidationData, SessionIndex,
SigningContext, ValidationCodeHash, ValidatorId,
};
use polkadot_service::Handle;
use polkadot_test_client::{
Client as PClient, ClientBlockImportExt, DefaultTestClientBuilderExt, FullBackend as PBackend,
InitPolkadotBlockBuilder, TestClientBuilder, TestClientBuilderExt,
};
use sp_api::{ApiRef, ProvideRuntimeApi};
use sc_client_api::{Backend, BlockchainEvents};
use sp_blockchain::HeaderBackend;
use sp_consensus::BlockOrigin;
use sp_core::{Pair, H256};
use sp_keyring::Sr25519Keyring;
use sp_keystore::{testing::KeyStore, SyncCryptoStore, SyncCryptoStorePtr};
use sp_runtime::RuntimeAppPublic;
use std::collections::BTreeMap;
use sp_state_machine::StorageValue;
use std::{collections::BTreeMap, time::Duration};
fn check_error(error: crate::BoxedError, check_error: impl Fn(&BlockAnnounceError) -> bool) {
let error = *error
@@ -50,31 +53,190 @@ fn check_error(error: crate::BoxedError, check_error: impl Fn(&BlockAnnounceErro
}
#[derive(Clone)]
struct DummyCollatorNetwork;
struct DummyRelayChainInterface {
data: Arc<Mutex<ApiData>>,
relay_client: Arc<PClient>,
relay_backend: Arc<PBackend>,
}
impl SyncOracle for DummyCollatorNetwork {
fn is_major_syncing(&mut self) -> bool {
impl DummyRelayChainInterface {
fn new() -> Self {
let builder = TestClientBuilder::new();
let relay_backend = builder.backend();
Self {
data: Arc::new(Mutex::new(ApiData {
validators: vec![Sr25519Keyring::Alice.public().into()],
has_pending_availability: false,
})),
relay_client: Arc::new(builder.build()),
relay_backend,
}
}
}
#[async_trait]
impl RelayChainInterface for DummyRelayChainInterface {
fn validators(
&self,
_: &cumulus_primitives_core::relay_chain::BlockId,
) -> Result<Vec<ValidatorId>, sp_api::ApiError> {
Ok(self.data.lock().validators.clone())
}
fn block_status(
&self,
block_id: cumulus_primitives_core::relay_chain::BlockId,
) -> Result<sp_blockchain::BlockStatus, sp_blockchain::Error> {
self.relay_backend.blockchain().status(block_id)
}
fn best_block_hash(&self) -> PHash {
self.relay_backend.blockchain().info().best_hash
}
fn retrieve_dmq_contents(&self, _: ParaId, _: PHash) -> Option<Vec<InboundDownwardMessage>> {
unimplemented!("Not needed for test")
}
fn retrieve_all_inbound_hrmp_channel_contents(
&self,
_: ParaId,
_: PHash,
) -> Option<BTreeMap<ParaId, Vec<InboundHrmpMessage>>> {
Some(BTreeMap::new())
}
fn persisted_validation_data(
&self,
_: &cumulus_primitives_core::relay_chain::BlockId,
_: ParaId,
_: OccupiedCoreAssumption,
) -> Result<Option<PersistedValidationData>, sp_api::ApiError> {
Ok(Some(PersistedValidationData {
parent_head: HeadData(default_header().encode()),
..Default::default()
}))
}
fn candidate_pending_availability(
&self,
_: &cumulus_primitives_core::relay_chain::BlockId,
_: ParaId,
) -> Result<Option<CommittedCandidateReceipt>, sp_api::ApiError> {
if self.data.lock().has_pending_availability {
Ok(Some(CommittedCandidateReceipt {
descriptor: CandidateDescriptor {
para_head: polkadot_parachain::primitives::HeadData(default_header().encode())
.hash(),
para_id: 0u32.into(),
relay_parent: PHash::random(),
collator: CollatorPair::generate().0.public(),
persisted_validation_data_hash: PHash::random().into(),
pov_hash: PHash::random(),
erasure_root: PHash::random(),
signature: sp_core::sr25519::Signature([0u8; 64]).into(),
validation_code_hash: ValidationCodeHash::from(PHash::random()),
},
commitments: CandidateCommitments {
upward_messages: Vec::new(),
horizontal_messages: Vec::new(),
new_validation_code: None,
head_data: HeadData(Vec::new()),
processed_downward_messages: 0,
hrmp_watermark: 0,
},
}))
} else {
Ok(None)
}
}
fn session_index_for_child(
&self,
_: &cumulus_primitives_core::relay_chain::BlockId,
) -> Result<SessionIndex, sp_api::ApiError> {
Ok(0)
}
fn import_notification_stream(&self) -> sc_client_api::ImportNotifications<PBlock> {
self.relay_client.import_notification_stream()
}
fn finality_notification_stream(&self) -> sc_client_api::FinalityNotifications<PBlock> {
self.relay_client.finality_notification_stream()
}
fn storage_changes_notification_stream(
&self,
filter_keys: Option<&[sc_client_api::StorageKey]>,
child_filter_keys: Option<
&[(sc_client_api::StorageKey, Option<Vec<sc_client_api::StorageKey>>)],
>,
) -> sc_client_api::blockchain::Result<sc_client_api::StorageEventStream<PHash>> {
self.relay_client
.storage_changes_notification_stream(filter_keys, child_filter_keys)
}
fn is_major_syncing(&self) -> bool {
false
}
fn is_offline(&mut self) -> bool {
unimplemented!("Not required in tests")
fn overseer_handle(&self) -> Option<Handle> {
unimplemented!("Not needed for test")
}
fn get_storage_by_key(
&self,
_: &polkadot_service::BlockId,
_: &[u8],
) -> Result<Option<StorageValue>, sp_blockchain::Error> {
unimplemented!("Not needed for test")
}
fn prove_read(
&self,
_: &polkadot_service::BlockId,
_: &Vec<Vec<u8>>,
) -> Result<Option<sc_client_api::StorageProof>, Box<dyn sp_state_machine::Error>> {
unimplemented!("Not needed for test")
}
async fn wait_for_block(
&self,
hash: PHash,
) -> Result<(), cumulus_relay_chain_interface::WaitError> {
let mut listener = match check_block_in_chain(
self.relay_backend.clone(),
self.relay_client.clone(),
hash,
)? {
BlockCheckStatus::InChain => return Ok(()),
BlockCheckStatus::Unknown(listener) => listener,
};
let mut timeout = futures_timer::Delay::new(Duration::from_secs(10)).fuse();
loop {
futures::select! {
_ = timeout => return Err(WaitError::Timeout(hash)),
evt = listener.next() => match evt {
Some(evt) if evt.hash == hash => return Ok(()),
// Not the event we waited on.
Some(_) => continue,
None => return Err(WaitError::ImportListenerClosed(hash)),
}
}
}
}
}
fn make_validator_and_api(
) -> (BlockAnnounceValidator<Block, TestApi, PBackend, PClient>, Arc<TestApi>) {
let api = Arc::new(TestApi::new());
) -> (BlockAnnounceValidator<Block, Arc<DummyRelayChainInterface>>, Arc<DummyRelayChainInterface>) {
let relay_chain_interface = Arc::new(DummyRelayChainInterface::new());
(
BlockAnnounceValidator::new(
api.clone(),
ParaId::from(56),
Box::new(DummyCollatorNetwork),
api.relay_backend.clone(),
api.relay_client.clone(),
),
api,
BlockAnnounceValidator::new(relay_chain_interface.clone(), ParaId::from(56)),
relay_chain_interface,
)
}
@@ -90,7 +252,7 @@ fn default_header() -> Header {
/// Same as [`make_gossip_message_and_header`], but using the genesis header as relay parent.
async fn make_gossip_message_and_header_using_genesis(
api: Arc<TestApi>,
api: Arc<DummyRelayChainInterface>,
validator_index: u32,
) -> (CollationSecondedSignal, Header) {
let relay_parent = api.relay_client.hash(0).ok().flatten().expect("Genesis hash exists");
@@ -99,7 +261,7 @@ async fn make_gossip_message_and_header_using_genesis(
}
async fn make_gossip_message_and_header(
api: Arc<TestApi>,
relay_chain_interface: Arc<DummyRelayChainInterface>,
relay_parent: H256,
validator_index: u32,
) -> (CollationSecondedSignal, Header) {
@@ -110,8 +272,9 @@ async fn make_gossip_message_and_header(
Some(&Sr25519Keyring::Alice.to_seed()),
)
.unwrap();
let session_index =
api.runtime_api().session_index_for_child(&BlockId::Hash(relay_parent)).unwrap();
let session_index = relay_chain_interface
.session_index_for_child(&BlockId::Hash(relay_parent))
.unwrap();
let signing_context = SigningContext { parent_hash: relay_parent, session_index };
let header = default_header();
@@ -292,8 +455,7 @@ fn check_statement_seconded() {
Some(&Sr25519Keyring::Alice.to_seed()),
)
.unwrap();
let session_index =
api.runtime_api().session_index_for_child(&BlockId::Hash(relay_parent)).unwrap();
let session_index = api.session_index_for_child(&BlockId::Hash(relay_parent)).unwrap();
let signing_context = SigningContext { parent_hash: relay_parent, session_index };
let statement = Statement::Valid(Default::default());
@@ -397,146 +559,3 @@ struct ApiData {
validators: Vec<ValidatorId>,
has_pending_availability: bool,
}
struct TestApi {
data: Arc<Mutex<ApiData>>,
relay_client: Arc<PClient>,
relay_backend: Arc<PBackend>,
}
impl TestApi {
fn new() -> Self {
let builder = TestClientBuilder::new();
let relay_backend = builder.backend();
Self {
data: Arc::new(Mutex::new(ApiData {
validators: vec![Sr25519Keyring::Alice.public().into()],
has_pending_availability: false,
})),
relay_client: Arc::new(builder.build()),
relay_backend,
}
}
}
#[derive(Default)]
struct RuntimeApi {
data: Arc<Mutex<ApiData>>,
}
impl ProvideRuntimeApi<PBlock> for TestApi {
type Api = RuntimeApi;
fn runtime_api<'a>(&'a self) -> ApiRef<'a, Self::Api> {
RuntimeApi { data: self.data.clone() }.into()
}
}
sp_api::mock_impl_runtime_apis! {
impl ParachainHost<PBlock> for RuntimeApi {
fn validators(&self) -> Vec<ValidatorId> {
self.data.lock().validators.clone()
}
fn validator_groups(&self) -> (Vec<Vec<ValidatorIndex>>, GroupRotationInfo<BlockNumber>) {
(Vec::new(), GroupRotationInfo { session_start_block: 0, group_rotation_frequency: 0, now: 0 })
}
fn availability_cores(&self) -> Vec<CoreState<PHash>> {
Vec::new()
}
fn persisted_validation_data(
&self,
_: ParaId,
_: OccupiedCoreAssumption,
) -> Option<PersistedValidationData<PHash, BlockNumber>> {
Some(PersistedValidationData {
parent_head: HeadData(default_header().encode()),
..Default::default()
})
}
fn session_index_for_child(&self) -> SessionIndex {
0
}
fn validation_code(&self, _: ParaId, _: OccupiedCoreAssumption) -> Option<ValidationCode> {
None
}
fn candidate_pending_availability(&self, _: ParaId) -> Option<CommittedCandidateReceipt<PHash>> {
if self.data.lock().has_pending_availability {
Some(CommittedCandidateReceipt {
descriptor: CandidateDescriptor {
para_head: HeadData(
default_header().encode(),
).hash(),
para_id: 0u32.into(),
relay_parent: PHash::random(),
collator: CollatorPair::generate().0.public(),
persisted_validation_data_hash: PHash::random().into(),
pov_hash: PHash::random(),
erasure_root: PHash::random(),
signature: sp_core::sr25519::Signature([0u8; 64]).into(),
validation_code_hash: ValidationCodeHash::from(PHash::random()),
},
commitments: CandidateCommitments {
upward_messages: Vec::new(),
horizontal_messages: Vec::new(),
new_validation_code: None,
head_data: HeadData(Vec::new()),
processed_downward_messages: 0,
hrmp_watermark: 0
}
})
} else {
None
}
}
fn candidate_events(&self) -> Vec<CandidateEvent<PHash>> {
Vec::new()
}
fn session_info(_: SessionIndex) -> Option<SessionInfo> {
None
}
fn check_validation_outputs(_: ParaId, _: CandidateCommitments) -> bool {
false
}
fn dmq_contents(_: ParaId) -> Vec<InboundDownwardMessage<BlockNumber>> {
Vec::new()
}
fn inbound_hrmp_channels_contents(
_: ParaId,
) -> BTreeMap<ParaId, Vec<InboundHrmpMessage<BlockNumber>>> {
BTreeMap::new()
}
fn assumed_validation_data(
_: ParaId,
_: Hash,
) -> Option<(PersistedValidationData<Hash, BlockNumber>, ValidationCodeHash)> {
None
}
fn validation_code_by_hash(_: ValidationCodeHash) -> Option<ValidationCode> {
None
}
fn on_chain_votes() -> Option<ScrapedOnChainVotes<Hash>> {
None
}
fn submit_pvf_check_statement(_: PvfCheckStatement, _: ValidatorSignature) {}
fn pvfs_require_precheck() -> Vec<ValidationCodeHash> {
Vec::new()
}
}
}
@@ -1,235 +0,0 @@
// Copyright 2020-2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Provides the [`WaitOnRelayChainBlock`] type.
use futures::{future::ready, Future, FutureExt, StreamExt};
use polkadot_primitives::v1::{Block as PBlock, Hash as PHash};
use sc_client_api::{
blockchain::{self, BlockStatus, HeaderBackend},
Backend, BlockchainEvents,
};
use sp_runtime::generic::BlockId;
use std::{sync::Arc, time::Duration};
/// The timeout in seconds after that the waiting for a block should be aborted.
const TIMEOUT_IN_SECONDS: u64 = 6;
/// Custom error type used by [`WaitOnRelayChainBlock`].
#[derive(Debug, derive_more::Display)]
pub enum Error {
#[display(fmt = "Timeout while waiting for relay-chain block `{}` to be imported.", _0)]
Timeout(PHash),
#[display(
fmt = "Import listener closed while waiting for relay-chain block `{}` to be imported.",
_0
)]
ImportListenerClosed(PHash),
#[display(
fmt = "Blockchain returned an error while waiting for relay-chain block `{}` to be imported: {:?}",
_0,
_1
)]
BlockchainError(PHash, blockchain::Error),
}
/// A helper to wait for a given relay chain block in an async way.
///
/// The caller needs to pass the hash of a block it waits for and the function will return when the
/// block is available or an error occurred.
///
/// The waiting for the block is implemented as follows:
///
/// 1. Get a read lock on the import lock from the backend.
///
/// 2. Check if the block is already imported. If yes, return from the function.
///
/// 3. If the block isn't imported yet, add an import notification listener.
///
/// 4. Poll the import notification listener until the block is imported or the timeout is fired.
///
/// The timeout is set to 6 seconds. This should be enough time to import the block in the current
/// round and if not, the new round of the relay chain already started anyway.
pub struct WaitOnRelayChainBlock<B, BCE> {
block_chain_events: Arc<BCE>,
backend: Arc<B>,
}
impl<B, BCE> Clone for WaitOnRelayChainBlock<B, BCE> {
fn clone(&self) -> Self {
Self { backend: self.backend.clone(), block_chain_events: self.block_chain_events.clone() }
}
}
impl<B, BCE> WaitOnRelayChainBlock<B, BCE> {
/// Creates a new instance of `Self`.
pub fn new(backend: Arc<B>, block_chain_events: Arc<BCE>) -> Self {
Self { backend, block_chain_events }
}
}
impl<B, BCE> WaitOnRelayChainBlock<B, BCE>
where
B: Backend<PBlock>,
BCE: BlockchainEvents<PBlock>,
{
pub fn wait_on_relay_chain_block(
&self,
hash: PHash,
) -> impl Future<Output = Result<(), Error>> {
let _lock = self.backend.get_import_lock().read();
match self.backend.blockchain().status(BlockId::Hash(hash)) {
Ok(BlockStatus::InChain) => return ready(Ok(())).boxed(),
Err(err) => return ready(Err(Error::BlockchainError(hash, err))).boxed(),
_ => {},
}
let mut listener = self.block_chain_events.import_notification_stream();
// Now it is safe to drop the lock, even when the block is now imported, it should show
// up in our registered listener.
drop(_lock);
let mut timeout = futures_timer::Delay::new(Duration::from_secs(TIMEOUT_IN_SECONDS)).fuse();
async move {
loop {
futures::select! {
_ = timeout => return Err(Error::Timeout(hash)),
evt = listener.next() => match evt {
Some(evt) if evt.hash == hash => return Ok(()),
// Not the event we waited on.
Some(_) => continue,
None => return Err(Error::ImportListenerClosed(hash)),
}
}
}
}
.boxed()
}
}
#[cfg(test)]
mod tests {
use super::*;
use polkadot_test_client::{
construct_transfer_extrinsic, BlockBuilderExt, Client, ClientBlockImportExt,
DefaultTestClientBuilderExt, ExecutionStrategy, FullBackend, InitPolkadotBlockBuilder,
TestClientBuilder, TestClientBuilderExt,
};
use sp_consensus::BlockOrigin;
use sp_runtime::traits::Block as BlockT;
use futures::{executor::block_on, poll, task::Poll};
fn build_client_backend_and_block() -> (Arc<Client>, Arc<FullBackend>, PBlock) {
let builder =
TestClientBuilder::new().set_execution_strategy(ExecutionStrategy::NativeWhenPossible);
let backend = builder.backend();
let client = Arc::new(builder.build());
let block_builder = client.init_polkadot_block_builder();
let block = block_builder.build().expect("Finalizes the block").block;
(client, backend, block)
}
#[test]
fn returns_directly_for_available_block() {
let (mut client, backend, block) = build_client_backend_and_block();
let hash = block.hash();
block_on(client.import(BlockOrigin::Own, block)).expect("Imports the block");
let wait = WaitOnRelayChainBlock::new(backend, client);
block_on(async move {
// Should be ready on the first poll
assert!(matches!(poll!(wait.wait_on_relay_chain_block(hash)), Poll::Ready(Ok(()))));
});
}
#[test]
fn resolve_after_block_import_notification_was_received() {
let (mut client, backend, block) = build_client_backend_and_block();
let hash = block.hash();
let wait = WaitOnRelayChainBlock::new(backend, client.clone());
block_on(async move {
let mut future = wait.wait_on_relay_chain_block(hash);
// As the block is not yet imported, the first poll should return `Pending`
assert!(poll!(&mut future).is_pending());
// Import the block that should fire the notification
client.import(BlockOrigin::Own, block).await.expect("Imports the block");
// Now it should have received the notification and report that the block was imported
assert!(matches!(poll!(future), Poll::Ready(Ok(()))));
});
}
#[test]
fn wait_for_block_time_out_when_block_is_not_imported() {
let (client, backend, block) = build_client_backend_and_block();
let hash = block.hash();
let wait = WaitOnRelayChainBlock::new(backend, client.clone());
assert!(matches!(block_on(wait.wait_on_relay_chain_block(hash)), Err(Error::Timeout(_))));
}
#[test]
fn do_not_resolve_after_different_block_import_notification_was_received() {
let (mut client, backend, block) = build_client_backend_and_block();
let hash = block.hash();
let ext = construct_transfer_extrinsic(
&*client,
sp_keyring::Sr25519Keyring::Alice,
sp_keyring::Sr25519Keyring::Bob,
1000,
);
let mut block_builder = client.init_polkadot_block_builder();
// Push an extrinsic to get a different block hash.
block_builder.push_polkadot_extrinsic(ext).expect("Push extrinsic");
let block2 = block_builder.build().expect("Build second block").block;
let hash2 = block2.hash();
let wait = WaitOnRelayChainBlock::new(backend, client.clone());
block_on(async move {
let mut future = wait.wait_on_relay_chain_block(hash);
let mut future2 = wait.wait_on_relay_chain_block(hash2);
// As the block is not yet imported, the first poll should return `Pending`
assert!(poll!(&mut future).is_pending());
assert!(poll!(&mut future2).is_pending());
// Import the block that should fire the notification
client.import(BlockOrigin::Own, block2).await.expect("Imports the second block");
// The import notification of the second block should not make this one finish
assert!(poll!(&mut future).is_pending());
// Now it should have received the notification and report that the block was imported
assert!(matches!(poll!(future2), Poll::Ready(Ok(()))));
client.import(BlockOrigin::Own, block).await.expect("Imports the first block");
// Now it should be ready
assert!(matches!(poll!(future), Poll::Ready(Ok(()))));
});
}
}
+1
View File
@@ -22,6 +22,7 @@ polkadot-node-subsystem = { git = "https://github.com/paritytech/polkadot", bran
# Cumulus deps
cumulus-primitives-core = { path = "../../primitives/core" }
cumulus-relay-chain-interface = {path = "../relay-chain-interface"}
# other deps
codec = { package = "parity-scale-codec", version = "2.3.0", features = [ "derive" ] }
+13 -20
View File
@@ -44,7 +44,6 @@
use sc_client_api::{BlockBackend, BlockchainEvents, UsageProvider};
use sc_consensus::import_queue::{ImportQueue, IncomingBlock};
use sp_api::ProvideRuntimeApi;
use sp_consensus::{BlockOrigin, BlockStatus};
use sp_runtime::{
generic::BlockId,
@@ -54,11 +53,11 @@ use sp_runtime::{
use polkadot_node_primitives::{AvailableData, POV_BOMB_LIMIT};
use polkadot_overseer::Handle as OverseerHandle;
use polkadot_primitives::v1::{
Block as PBlock, CandidateReceipt, CommittedCandidateReceipt, Id as ParaId, ParachainHost,
SessionIndex,
CandidateReceipt, CommittedCandidateReceipt, Id as ParaId, SessionIndex,
};
use cumulus_primitives_core::ParachainBlockData;
use cumulus_relay_chain_interface::RelayChainInterface;
use codec::Decode;
use futures::{select, stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt};
@@ -102,15 +101,14 @@ pub struct PoVRecovery<Block: BlockT, PC, IQ, RC> {
relay_chain_slot_duration: Duration,
parachain_client: Arc<PC>,
parachain_import_queue: IQ,
relay_chain_client: Arc<RC>,
relay_chain_interface: RC,
para_id: ParaId,
}
impl<Block: BlockT, PC, IQ, RC> PoVRecovery<Block, PC, IQ, RC>
impl<Block: BlockT, PC, IQ, RCInterface> PoVRecovery<Block, PC, IQ, RCInterface>
where
PC: BlockBackend<Block> + BlockchainEvents<Block> + UsageProvider<Block>,
RC: ProvideRuntimeApi<PBlock> + BlockchainEvents<PBlock>,
RC::Api: ParachainHost<PBlock>,
RCInterface: RelayChainInterface + Clone,
IQ: ImportQueue<Block>,
{
/// Create a new instance.
@@ -119,7 +117,7 @@ where
relay_chain_slot_duration: Duration,
parachain_client: Arc<PC>,
parachain_import_queue: IQ,
relay_chain_client: Arc<RC>,
relay_chain_interface: RCInterface,
para_id: ParaId,
) -> Self {
Self {
@@ -130,7 +128,7 @@ where
waiting_for_parent: HashMap::new(),
parachain_client,
parachain_import_queue,
relay_chain_client,
relay_chain_interface,
para_id,
}
}
@@ -365,7 +363,7 @@ where
let mut imported_blocks = self.parachain_client.import_notification_stream().fuse();
let mut finalized_blocks = self.parachain_client.finality_notification_stream().fuse();
let pending_candidates =
pending_candidates(self.relay_chain_client.clone(), self.para_id).fuse();
pending_candidates(self.relay_chain_interface.clone(), self.para_id).fuse();
futures::pin_mut!(pending_candidates);
loop {
@@ -419,20 +417,15 @@ where
}
/// Returns a stream over pending candidates for the parachain corresponding to `para_id`.
fn pending_candidates<RC>(
relay_chain_client: Arc<RC>,
fn pending_candidates(
relay_chain_client: impl RelayChainInterface,
para_id: ParaId,
) -> impl Stream<Item = (CommittedCandidateReceipt, SessionIndex)>
where
RC: ProvideRuntimeApi<PBlock> + BlockchainEvents<PBlock>,
RC::Api: ParachainHost<PBlock>,
{
) -> impl Stream<Item = (CommittedCandidateReceipt, SessionIndex)> {
relay_chain_client.import_notification_stream().filter_map(move |n| {
let runtime_api = relay_chain_client.runtime_api();
let res = runtime_api
let res = relay_chain_client
.candidate_pending_availability(&BlockId::hash(n.hash), para_id)
.and_then(|pa| {
runtime_api
relay_chain_client
.session_index_for_child(&BlockId::hash(n.hash))
.map(|v| pa.map(|pa| (pa, v)))
})
@@ -0,0 +1,21 @@
[package]
authors = ["Parity Technologies <admin@parity.io>"]
name = "cumulus-relay-chain-interface"
version = "0.1.0"
edition = "2021"
[dependencies]
polkadot-overseer = { git = "https://github.com/paritytech/polkadot", branch = "master" }
cumulus-primitives-core = { path = "../../primitives/core" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-state-machine = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
parking_lot = "0.11.1"
derive_more = "0.99.2"
async-trait = "0.1.52"
@@ -0,0 +1,248 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Cumulus.
// Cumulus is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Cumulus is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.
use std::{collections::BTreeMap, sync::Arc};
use cumulus_primitives_core::{
relay_chain::{
v1::{CommittedCandidateReceipt, OccupiedCoreAssumption, SessionIndex, ValidatorId},
Block as PBlock, BlockId, Hash as PHash, InboundHrmpMessage,
},
InboundDownwardMessage, ParaId, PersistedValidationData,
};
use polkadot_overseer::Handle as OverseerHandle;
use sc_client_api::{blockchain::BlockStatus, StorageProof};
use sp_api::ApiError;
use sp_state_machine::StorageValue;
use async_trait::async_trait;
#[derive(Debug, derive_more::Display)]
pub enum WaitError {
#[display(fmt = "Timeout while waiting for relay-chain block `{}` to be imported.", _0)]
Timeout(PHash),
#[display(
fmt = "Import listener closed while waiting for relay-chain block `{}` to be imported.",
_0
)]
ImportListenerClosed(PHash),
#[display(
fmt = "Blockchain returned an error while waiting for relay-chain block `{}` to be imported: {:?}",
_0,
_1
)]
BlockchainError(PHash, sp_blockchain::Error),
}
/// Trait that provides all necessary methods for interaction between collator and relay chain.
#[async_trait]
pub trait RelayChainInterface: Send + Sync {
/// Fetch a storage item by key.
fn get_storage_by_key(
&self,
block_id: &BlockId,
key: &[u8],
) -> Result<Option<StorageValue>, sp_blockchain::Error>;
/// Fetch a vector of current validators.
fn validators(&self, block_id: &BlockId) -> Result<Vec<ValidatorId>, ApiError>;
/// Get the status of a given block.
fn block_status(&self, block_id: BlockId) -> Result<BlockStatus, sp_blockchain::Error>;
/// Get the hash of the current best block.
fn best_block_hash(&self) -> PHash;
/// Returns the whole contents of the downward message queue for the parachain we are collating
/// for.
///
/// Returns `None` in case of an error.
fn retrieve_dmq_contents(
&self,
para_id: ParaId,
relay_parent: PHash,
) -> Option<Vec<InboundDownwardMessage>>;
/// Returns channels contents for each inbound HRMP channel addressed to the parachain we are
/// collating for.
///
/// Empty channels are also included.
fn retrieve_all_inbound_hrmp_channel_contents(
&self,
para_id: ParaId,
relay_parent: PHash,
) -> Option<BTreeMap<ParaId, Vec<InboundHrmpMessage>>>;
/// Yields the persisted validation data for the given `ParaId` along with an assumption that
/// should be used if the para currently occupies a core.
///
/// Returns `None` if either the para is not registered or the assumption is `Freed`
/// and the para already occupies a core.
fn persisted_validation_data(
&self,
block_id: &BlockId,
para_id: ParaId,
_: OccupiedCoreAssumption,
) -> Result<Option<PersistedValidationData>, ApiError>;
/// Get the receipt of a candidate pending availability. This returns `Some` for any paras
/// assigned to occupied cores in `availability_cores` and `None` otherwise.
fn candidate_pending_availability(
&self,
block_id: &BlockId,
para_id: ParaId,
) -> Result<Option<CommittedCandidateReceipt>, ApiError>;
/// Returns the session index expected at a child of the block.
fn session_index_for_child(&self, block_id: &BlockId) -> Result<SessionIndex, ApiError>;
/// Get a stream of import block notifications.
fn import_notification_stream(&self) -> sc_client_api::ImportNotifications<PBlock>;
/// Wait for a block with a given hash in the relay chain.
///
/// This method returns immediately on error or if the block is already
/// reported to be in chain. Otherwise, it waits for the block to arrive.
async fn wait_for_block(&self, hash: PHash) -> Result<(), WaitError>;
/// Get a stream of finality notifications.
fn finality_notification_stream(&self) -> sc_client_api::FinalityNotifications<PBlock>;
/// Get a stream of storage change notifications.
fn storage_changes_notification_stream(
&self,
filter_keys: Option<&[sc_client_api::StorageKey]>,
child_filter_keys: Option<
&[(sc_client_api::StorageKey, Option<Vec<sc_client_api::StorageKey>>)],
>,
) -> sc_client_api::blockchain::Result<sc_client_api::StorageEventStream<PHash>>;
/// Whether the synchronization service is undergoing major sync.
/// Returns true if so.
fn is_major_syncing(&self) -> bool;
/// Get a handle to the overseer.
fn overseer_handle(&self) -> Option<OverseerHandle>;
/// Generate a storage read proof.
fn prove_read(
&self,
block_id: &BlockId,
relevant_keys: &Vec<Vec<u8>>,
) -> Result<Option<StorageProof>, Box<dyn sp_state_machine::Error>>;
}
#[async_trait]
impl<T> RelayChainInterface for Arc<T>
where
T: RelayChainInterface + ?Sized,
{
fn retrieve_dmq_contents(
&self,
para_id: ParaId,
relay_parent: PHash,
) -> Option<Vec<InboundDownwardMessage>> {
(**self).retrieve_dmq_contents(para_id, relay_parent)
}
fn retrieve_all_inbound_hrmp_channel_contents(
&self,
para_id: ParaId,
relay_parent: PHash,
) -> Option<BTreeMap<ParaId, Vec<InboundHrmpMessage>>> {
(**self).retrieve_all_inbound_hrmp_channel_contents(para_id, relay_parent)
}
fn persisted_validation_data(
&self,
block_id: &BlockId,
para_id: ParaId,
occupied_core_assumption: OccupiedCoreAssumption,
) -> Result<Option<PersistedValidationData>, ApiError> {
(**self).persisted_validation_data(block_id, para_id, occupied_core_assumption)
}
fn candidate_pending_availability(
&self,
block_id: &BlockId,
para_id: ParaId,
) -> Result<Option<CommittedCandidateReceipt>, ApiError> {
(**self).candidate_pending_availability(block_id, para_id)
}
fn session_index_for_child(&self, block_id: &BlockId) -> Result<SessionIndex, ApiError> {
(**self).session_index_for_child(block_id)
}
fn validators(&self, block_id: &BlockId) -> Result<Vec<ValidatorId>, ApiError> {
(**self).validators(block_id)
}
fn import_notification_stream(&self) -> sc_client_api::ImportNotifications<PBlock> {
(**self).import_notification_stream()
}
fn finality_notification_stream(&self) -> sc_client_api::FinalityNotifications<PBlock> {
(**self).finality_notification_stream()
}
fn storage_changes_notification_stream(
&self,
filter_keys: Option<&[sc_client_api::StorageKey]>,
child_filter_keys: Option<
&[(sc_client_api::StorageKey, Option<Vec<sc_client_api::StorageKey>>)],
>,
) -> sc_client_api::blockchain::Result<sc_client_api::StorageEventStream<PHash>> {
(**self).storage_changes_notification_stream(filter_keys, child_filter_keys)
}
fn best_block_hash(&self) -> PHash {
(**self).best_block_hash()
}
fn block_status(&self, block_id: BlockId) -> Result<BlockStatus, sp_blockchain::Error> {
(**self).block_status(block_id)
}
fn is_major_syncing(&self) -> bool {
(**self).is_major_syncing()
}
fn overseer_handle(&self) -> Option<OverseerHandle> {
(**self).overseer_handle()
}
fn get_storage_by_key(
&self,
block_id: &BlockId,
key: &[u8],
) -> Result<Option<StorageValue>, sp_blockchain::Error> {
(**self).get_storage_by_key(block_id, key)
}
fn prove_read(
&self,
block_id: &BlockId,
relevant_keys: &Vec<Vec<u8>>,
) -> Result<Option<StorageProof>, Box<dyn sp_state_machine::Error>> {
(**self).prove_read(block_id, relevant_keys)
}
async fn wait_for_block(&self, hash: PHash) -> Result<(), WaitError> {
(**self).wait_for_block(hash).await
}
}
@@ -0,0 +1,43 @@
[package]
authors = ["Parity Technologies <admin@parity.io>"]
name = "cumulus-relay-chain-local"
version = "0.1.0"
edition = "2021"
[dependencies]
polkadot-client = { git = "https://github.com/paritytech/polkadot", branch = "master" }
polkadot-service = { git = "https://github.com/paritytech/polkadot", branch = "master" }
cumulus-primitives-core = { path = "../../primitives/core" }
cumulus-relay-chain-interface = { path = "../relay-chain-interface" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-state-machine = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-consensus-babe = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-service = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-telemetry = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" }
parking_lot = "0.11.1"
tracing = "0.1.25"
async-trait = "0.1.52"
futures = { version = "0.3.1", features = ["compat"] }
futures-timer = "3.0.2"
[dev-dependencies]
# Cumulus deps
cumulus-test-service = { path = "../../test/service" }
# Polkadot deps
polkadot-test-client = { git = "https://github.com/paritytech/polkadot", branch = "master" }
polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "master" }
# substrate deps
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
+542
View File
@@ -0,0 +1,542 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Cumulus.
// Cumulus is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Cumulus is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.
use std::{sync::Arc, time::Duration};
use async_trait::async_trait;
use cumulus_primitives_core::{
relay_chain::{
v1::{
CommittedCandidateReceipt, OccupiedCoreAssumption, ParachainHost, SessionIndex,
ValidatorId,
},
Block as PBlock, BlockId, Hash as PHash, InboundHrmpMessage,
},
InboundDownwardMessage, ParaId, PersistedValidationData,
};
use cumulus_relay_chain_interface::{RelayChainInterface, WaitError};
use futures::{FutureExt, StreamExt};
use parking_lot::Mutex;
use polkadot_client::{ClientHandle, ExecuteWithClient, FullBackend};
use polkadot_service::{
AuxStore, BabeApi, CollatorPair, Configuration, Handle, NewFull, Role, TaskManager,
};
use sc_client_api::{
blockchain::BlockStatus, Backend, BlockchainEvents, HeaderBackend, ImportNotifications,
StorageProof, UsageProvider,
};
use sc_telemetry::TelemetryWorkerHandle;
use sp_api::{ApiError, ProvideRuntimeApi};
use sp_consensus::SyncOracle;
use sp_core::{sp_std::collections::btree_map::BTreeMap, Pair};
use sp_state_machine::{Backend as StateBackend, StorageValue};
const LOG_TARGET: &str = "relay-chain-local";
/// The timeout in seconds after that the waiting for a block should be aborted.
const TIMEOUT_IN_SECONDS: u64 = 6;
/// Provides an implementation of the [`RelayChainInterface`] using a local in-process relay chain node.
pub struct RelayChainLocal<Client> {
full_client: Arc<Client>,
backend: Arc<FullBackend>,
sync_oracle: Arc<Mutex<Box<dyn SyncOracle + Send + Sync>>>,
overseer_handle: Option<Handle>,
}
impl<Client> RelayChainLocal<Client> {
/// Create a new instance of [`RelayChainLocal`]
pub fn new(
full_client: Arc<Client>,
backend: Arc<FullBackend>,
sync_oracle: Arc<Mutex<Box<dyn SyncOracle + Send + Sync>>>,
overseer_handle: Option<Handle>,
) -> Self {
Self { full_client, backend, sync_oracle, overseer_handle }
}
}
impl<T> Clone for RelayChainLocal<T> {
fn clone(&self) -> Self {
Self {
full_client: self.full_client.clone(),
backend: self.backend.clone(),
sync_oracle: self.sync_oracle.clone(),
overseer_handle: self.overseer_handle.clone(),
}
}
}
#[async_trait]
impl<Client> RelayChainInterface for RelayChainLocal<Client>
where
Client: ProvideRuntimeApi<PBlock>
+ BlockchainEvents<PBlock>
+ AuxStore
+ UsageProvider<PBlock>
+ Sync
+ Send,
Client::Api: ParachainHost<PBlock> + BabeApi<PBlock>,
{
fn retrieve_dmq_contents(
&self,
para_id: ParaId,
relay_parent: PHash,
) -> Option<Vec<InboundDownwardMessage>> {
self.full_client
.runtime_api()
.dmq_contents_with_context(
&BlockId::hash(relay_parent),
sp_core::ExecutionContext::Importing,
para_id,
)
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
relay_parent = ?relay_parent,
error = ?e,
"An error occured during requesting the downward messages.",
);
})
.ok()
}
fn retrieve_all_inbound_hrmp_channel_contents(
&self,
para_id: ParaId,
relay_parent: PHash,
) -> Option<BTreeMap<ParaId, Vec<InboundHrmpMessage>>> {
self.full_client
.runtime_api()
.inbound_hrmp_channels_contents_with_context(
&BlockId::hash(relay_parent),
sp_core::ExecutionContext::Importing,
para_id,
)
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
relay_parent = ?relay_parent,
error = ?e,
"An error occured during requesting the inbound HRMP messages.",
);
})
.ok()
}
fn persisted_validation_data(
&self,
block_id: &BlockId,
para_id: ParaId,
occupied_core_assumption: OccupiedCoreAssumption,
) -> Result<Option<PersistedValidationData>, ApiError> {
self.full_client.runtime_api().persisted_validation_data(
block_id,
para_id,
occupied_core_assumption,
)
}
fn candidate_pending_availability(
&self,
block_id: &BlockId,
para_id: ParaId,
) -> Result<Option<CommittedCandidateReceipt>, ApiError> {
self.full_client.runtime_api().candidate_pending_availability(block_id, para_id)
}
fn session_index_for_child(&self, block_id: &BlockId) -> Result<SessionIndex, ApiError> {
self.full_client.runtime_api().session_index_for_child(block_id)
}
fn validators(&self, block_id: &BlockId) -> Result<Vec<ValidatorId>, ApiError> {
self.full_client.runtime_api().validators(block_id)
}
fn import_notification_stream(&self) -> sc_client_api::ImportNotifications<PBlock> {
self.full_client.import_notification_stream()
}
fn finality_notification_stream(&self) -> sc_client_api::FinalityNotifications<PBlock> {
self.full_client.finality_notification_stream()
}
fn storage_changes_notification_stream(
&self,
filter_keys: Option<&[sc_client_api::StorageKey]>,
child_filter_keys: Option<
&[(sc_client_api::StorageKey, Option<Vec<sc_client_api::StorageKey>>)],
>,
) -> sc_client_api::blockchain::Result<sc_client_api::StorageEventStream<PHash>> {
self.full_client
.storage_changes_notification_stream(filter_keys, child_filter_keys)
}
fn best_block_hash(&self) -> PHash {
self.backend.blockchain().info().best_hash
}
fn block_status(&self, block_id: BlockId) -> Result<BlockStatus, sp_blockchain::Error> {
self.backend.blockchain().status(block_id)
}
fn is_major_syncing(&self) -> bool {
let mut network = self.sync_oracle.lock();
network.is_major_syncing()
}
fn overseer_handle(&self) -> Option<Handle> {
self.overseer_handle.clone()
}
fn get_storage_by_key(
&self,
block_id: &BlockId,
key: &[u8],
) -> Result<Option<StorageValue>, sp_blockchain::Error> {
let state = self.backend.state_at(*block_id)?;
state.storage(key).map_err(sp_blockchain::Error::Storage)
}
fn prove_read(
&self,
block_id: &BlockId,
relevant_keys: &Vec<Vec<u8>>,
) -> Result<Option<StorageProof>, Box<dyn sp_state_machine::Error>> {
let state_backend = self
.backend
.state_at(*block_id)
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
relay_parent = ?block_id,
error = ?e,
"Cannot obtain the state of the relay chain.",
);
})
.ok();
match state_backend {
Some(state) => sp_state_machine::prove_read(state, relevant_keys)
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
relay_parent = ?block_id,
error = ?e,
"Failed to collect required relay chain state storage proof.",
);
e
})
.map(Some),
None => Ok(None),
}
}
/// Wait for a given relay chain block in an async way.
///
/// The caller needs to pass the hash of a block it waits for and the function will return when the
/// block is available or an error occurred.
///
/// The waiting for the block is implemented as follows:
///
/// 1. Get a read lock on the import lock from the backend.
///
/// 2. Check if the block is already imported. If yes, return from the function.
///
/// 3. If the block isn't imported yet, add an import notification listener.
///
/// 4. Poll the import notification listener until the block is imported or the timeout is fired.
///
/// The timeout is set to 6 seconds. This should be enough time to import the block in the current
/// round and if not, the new round of the relay chain already started anyway.
async fn wait_for_block(&self, hash: PHash) -> Result<(), WaitError> {
let mut listener =
match check_block_in_chain(self.backend.clone(), self.full_client.clone(), hash)? {
BlockCheckStatus::InChain => return Ok(()),
BlockCheckStatus::Unknown(listener) => listener,
};
let mut timeout = futures_timer::Delay::new(Duration::from_secs(TIMEOUT_IN_SECONDS)).fuse();
loop {
futures::select! {
_ = timeout => return Err(WaitError::Timeout(hash)),
evt = listener.next() => match evt {
Some(evt) if evt.hash == hash => return Ok(()),
// Not the event we waited on.
Some(_) => continue,
None => return Err(WaitError::ImportListenerClosed(hash)),
}
}
}
}
}
pub enum BlockCheckStatus {
/// Block is in chain
InChain,
/// Block status is unknown, listener can be used to wait for notification
Unknown(ImportNotifications<PBlock>),
}
// Helper function to check if a block is in chain.
pub fn check_block_in_chain<Client>(
backend: Arc<FullBackend>,
client: Arc<Client>,
hash: PHash,
) -> Result<BlockCheckStatus, WaitError>
where
Client: BlockchainEvents<PBlock>,
{
let _lock = backend.get_import_lock().read();
let block_id = BlockId::Hash(hash);
match backend.blockchain().status(block_id) {
Ok(BlockStatus::InChain) => return Ok(BlockCheckStatus::InChain),
Err(err) => return Err(WaitError::BlockchainError(hash, err)),
_ => {},
}
let listener = client.import_notification_stream();
Ok(BlockCheckStatus::Unknown(listener))
}
/// Builder for a concrete relay chain interface, created from a full node. Builds
/// a [`RelayChainLocal`] to access relay chain data necessary for parachain operation.
///
/// The builder takes a [`polkadot_client::Client`]
/// that wraps a concrete instance. By using [`polkadot_client::ExecuteWithClient`]
/// the builder gets access to this concrete instance and instantiates a [`RelayChainLocal`] with it.
struct RelayChainLocalBuilder {
polkadot_client: polkadot_client::Client,
backend: Arc<FullBackend>,
sync_oracle: Arc<Mutex<Box<dyn SyncOracle + Send + Sync>>>,
overseer_handle: Option<Handle>,
}
impl RelayChainLocalBuilder {
pub fn build(self) -> Arc<dyn RelayChainInterface> {
self.polkadot_client.clone().execute_with(self)
}
}
impl ExecuteWithClient for RelayChainLocalBuilder {
type Output = Arc<dyn RelayChainInterface>;
fn execute_with_client<Client, Api, Backend>(self, client: Arc<Client>) -> Self::Output
where
Client: ProvideRuntimeApi<PBlock>
+ BlockchainEvents<PBlock>
+ AuxStore
+ UsageProvider<PBlock>
+ 'static
+ Sync
+ Send,
Client::Api: ParachainHost<PBlock> + BabeApi<PBlock>,
{
Arc::new(RelayChainLocal::new(client, self.backend, self.sync_oracle, self.overseer_handle))
}
}
/// Build the Polkadot full node using the given `config`.
#[sc_tracing::logging::prefix_logs_with("Relaychain")]
fn build_polkadot_full_node(
config: Configuration,
telemetry_worker_handle: Option<TelemetryWorkerHandle>,
) -> Result<(NewFull<polkadot_client::Client>, CollatorPair), polkadot_service::Error> {
let is_light = matches!(config.role, Role::Light);
if is_light {
Err(polkadot_service::Error::Sub("Light client not supported.".into()))
} else {
let collator_key = CollatorPair::generate().0;
let relay_chain_full_node = polkadot_service::build_full(
config,
polkadot_service::IsCollator::Yes(collator_key.clone()),
None,
true,
None,
telemetry_worker_handle,
polkadot_service::RealOverseerGen,
)?;
Ok((relay_chain_full_node, collator_key))
}
}
/// Builds a relay chain interface by constructing a full relay chain node
pub fn build_relay_chain_interface(
polkadot_config: Configuration,
telemetry_worker_handle: Option<TelemetryWorkerHandle>,
task_manager: &mut TaskManager,
) -> Result<(Arc<(dyn RelayChainInterface + 'static)>, CollatorPair), polkadot_service::Error> {
let (full_node, collator_key) =
build_polkadot_full_node(polkadot_config, telemetry_worker_handle).map_err(
|e| match e {
polkadot_service::Error::Sub(x) => x,
s => format!("{}", s).into(),
},
)?;
let sync_oracle: Box<dyn SyncOracle + Send + Sync> = Box::new(full_node.network.clone());
let sync_oracle = Arc::new(Mutex::new(sync_oracle));
let relay_chain_interface_builder = RelayChainLocalBuilder {
polkadot_client: full_node.client.clone(),
backend: full_node.backend.clone(),
sync_oracle,
overseer_handle: full_node.overseer_handle.clone(),
};
task_manager.add_child(full_node.task_manager);
Ok((relay_chain_interface_builder.build(), collator_key))
}
#[cfg(test)]
mod tests {
use parking_lot::Mutex;
use super::*;
use polkadot_primitives::v1::Block as PBlock;
use polkadot_test_client::{
construct_transfer_extrinsic, BlockBuilderExt, Client, ClientBlockImportExt,
DefaultTestClientBuilderExt, ExecutionStrategy, InitPolkadotBlockBuilder,
TestClientBuilder, TestClientBuilderExt,
};
use sc_service::Arc;
use sp_consensus::{BlockOrigin, SyncOracle};
use sp_runtime::traits::Block as BlockT;
use futures::{executor::block_on, poll, task::Poll};
struct DummyNetwork {}
impl SyncOracle for DummyNetwork {
fn is_major_syncing(&mut self) -> bool {
unimplemented!("Not needed for test")
}
fn is_offline(&mut self) -> bool {
unimplemented!("Not needed for test")
}
}
fn build_client_backend_and_block() -> (Arc<Client>, PBlock, RelayChainLocal<Client>) {
let builder =
TestClientBuilder::new().set_execution_strategy(ExecutionStrategy::NativeWhenPossible);
let backend = builder.backend();
let client = Arc::new(builder.build());
let block_builder = client.init_polkadot_block_builder();
let block = block_builder.build().expect("Finalizes the block").block;
let dummy_network: Box<dyn SyncOracle + Sync + Send> = Box::new(DummyNetwork {});
(
client.clone(),
block,
RelayChainLocal::new(
client,
backend.clone(),
Arc::new(Mutex::new(dummy_network)),
None,
),
)
}
#[test]
fn returns_directly_for_available_block() {
let (mut client, block, relay_chain_interface) = build_client_backend_and_block();
let hash = block.hash();
block_on(client.import(BlockOrigin::Own, block)).expect("Imports the block");
block_on(async move {
// Should be ready on the first poll
assert!(matches!(
poll!(relay_chain_interface.wait_for_block(hash)),
Poll::Ready(Ok(()))
));
});
}
#[test]
fn resolve_after_block_import_notification_was_received() {
let (mut client, block, relay_chain_interface) = build_client_backend_and_block();
let hash = block.hash();
block_on(async move {
let mut future = relay_chain_interface.wait_for_block(hash);
// As the block is not yet imported, the first poll should return `Pending`
assert!(poll!(&mut future).is_pending());
// Import the block that should fire the notification
client.import(BlockOrigin::Own, block).await.expect("Imports the block");
// Now it should have received the notification and report that the block was imported
assert!(matches!(poll!(future), Poll::Ready(Ok(()))));
});
}
#[test]
fn wait_for_block_time_out_when_block_is_not_imported() {
let (_, block, relay_chain_interface) = build_client_backend_and_block();
let hash = block.hash();
assert!(matches!(
block_on(relay_chain_interface.wait_for_block(hash)),
Err(WaitError::Timeout(_))
));
}
#[test]
fn do_not_resolve_after_different_block_import_notification_was_received() {
let (mut client, block, relay_chain_interface) = build_client_backend_and_block();
let hash = block.hash();
let ext = construct_transfer_extrinsic(
&*client,
sp_keyring::Sr25519Keyring::Alice,
sp_keyring::Sr25519Keyring::Bob,
1000,
);
let mut block_builder = client.init_polkadot_block_builder();
// Push an extrinsic to get a different block hash.
block_builder.push_polkadot_extrinsic(ext).expect("Push extrinsic");
let block2 = block_builder.build().expect("Build second block").block;
let hash2 = block2.hash();
block_on(async move {
let mut future = relay_chain_interface.wait_for_block(hash);
let mut future2 = relay_chain_interface.wait_for_block(hash2);
// As the block is not yet imported, the first poll should return `Pending`
assert!(poll!(&mut future).is_pending());
assert!(poll!(&mut future2).is_pending());
// Import the block that should fire the notification
client.import(BlockOrigin::Own, block2).await.expect("Imports the second block");
// The import notification of the second block should not make this one finish
assert!(poll!(&mut future).is_pending());
// Now it should have received the notification and report that the block was imported
assert!(matches!(poll!(future2), Poll::Ready(Ok(()))));
client.import(BlockOrigin::Own, block).await.expect("Imports the first block");
// Now it should be ready
assert!(matches!(poll!(future), Poll::Ready(Ok(()))));
});
}
}
+1 -1
View File
@@ -9,6 +9,7 @@ edition = "2021"
cumulus-client-consensus-common = { path = "../consensus/common" }
cumulus-client-collator = { path = "../collator" }
cumulus-client-pov-recovery = { path = "../pov-recovery" }
cumulus-relay-chain-interface = { path = "../relay-chain-interface" }
cumulus-primitives-core = { path = "../../primitives/core" }
# Substrate dependencies
@@ -27,7 +28,6 @@ sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "mas
# Polkadot dependencies
polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "master" }
polkadot-service = { git = "https://github.com/paritytech/polkadot", branch = "master" }
polkadot-overseer = { git = "https://github.com/paritytech/polkadot", branch = "master" }
# Other deps
+55 -186
View File
@@ -20,9 +20,8 @@
use cumulus_client_consensus_common::ParachainConsensus;
use cumulus_primitives_core::{CollectCollationInfo, ParaId};
use polkadot_overseer::Handle as OverseerHandle;
use polkadot_primitives::v1::{Block as PBlock, CollatorPair};
use polkadot_service::{AbstractClient, Client as PClient, ClientHandle, RuntimeApiCollection};
use cumulus_relay_chain_interface::RelayChainInterface;
use polkadot_primitives::v1::CollatorPair;
use sc_client_api::{
Backend as BackendT, BlockBackend, BlockchainEvents, Finalizer, UsageProvider,
};
@@ -30,47 +29,32 @@ use sc_consensus::{
import_queue::{ImportQueue, IncomingBlock, Link, Origin},
BlockImport,
};
use sc_service::{Configuration, Role, TaskManager};
use sc_telemetry::TelemetryWorkerHandle;
use sc_service::{Configuration, TaskManager};
use sp_api::ProvideRuntimeApi;
use sp_blockchain::HeaderBackend;
use sp_consensus::BlockOrigin;
use sp_core::{traits::SpawnNamed, Pair};
use sp_core::traits::SpawnNamed;
use sp_runtime::{
traits::{BlakeTwo256, Block as BlockT, NumberFor},
traits::{Block as BlockT, NumberFor},
Justifications,
};
use std::{marker::PhantomData, ops::Deref, sync::Arc};
use std::{sync::Arc, time::Duration};
pub mod genesis;
/// The relay chain full node handle.
pub struct RFullNode<C> {
/// The relay chain full node handles.
pub relay_chain_full_node: polkadot_service::NewFull<C>,
/// The collator key used by the node.
pub collator_key: CollatorPair,
}
impl<C> Deref for RFullNode<C> {
type Target = polkadot_service::NewFull<C>;
fn deref(&self) -> &Self::Target {
&self.relay_chain_full_node
}
}
/// Parameters given to [`start_collator`].
pub struct StartCollatorParams<'a, Block: BlockT, BS, Client, Spawner, RClient, IQ> {
pub struct StartCollatorParams<'a, Block: BlockT, BS, Client, RCInterface, Spawner, IQ> {
pub block_status: Arc<BS>,
pub client: Arc<Client>,
pub announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
pub spawner: Spawner,
pub para_id: ParaId,
pub relay_chain_full_node: RFullNode<RClient>,
pub relay_chain_interface: RCInterface,
pub task_manager: &'a mut TaskManager,
pub parachain_consensus: Box<dyn ParachainConsensus<Block>>,
pub import_queue: IQ,
pub collator_key: CollatorPair,
pub slot_duration: Duration,
}
/// Start a collator node for a parachain.
@@ -78,7 +62,7 @@ pub struct StartCollatorParams<'a, Block: BlockT, BS, Client, Spawner, RClient,
/// A collator is similar to a validator in a normal blockchain.
/// It is responsible for producing blocks and sending the blocks to a
/// parachain validator for validation and inclusion into the relay chain.
pub async fn start_collator<'a, Block, BS, Client, Backend, Spawner, RClient, IQ>(
pub async fn start_collator<'a, Block, BS, Client, Backend, RCInterface, Spawner, IQ>(
StartCollatorParams {
block_status,
client,
@@ -86,10 +70,12 @@ pub async fn start_collator<'a, Block, BS, Client, Backend, Spawner, RClient, IQ
spawner,
para_id,
task_manager,
relay_chain_full_node,
relay_chain_interface,
parachain_consensus,
import_queue,
}: StartCollatorParams<'a, Block, BS, Client, Spawner, RClient, IQ>,
collator_key,
slot_duration,
}: StartCollatorParams<'a, Block, BS, Client, RCInterface, Spawner, IQ>,
) -> sc_service::error::Result<()>
where
Block: BlockT,
@@ -106,55 +92,58 @@ where
Client::Api: CollectCollationInfo<Block>,
for<'b> &'b Client: BlockImport<Block>,
Spawner: SpawnNamed + Clone + Send + Sync + 'static,
RClient: ClientHandle,
RCInterface: RelayChainInterface + Clone + 'static,
Backend: BackendT<Block> + 'static,
IQ: ImportQueue<Block> + 'static,
{
relay_chain_full_node.client.execute_with(StartConsensus {
let consensus = cumulus_client_consensus_common::run_parachain_consensus(
para_id,
announce_block: announce_block.clone(),
client: client.clone(),
task_manager,
_phantom: PhantomData,
});
client.clone(),
relay_chain_interface.clone(),
announce_block.clone(),
);
relay_chain_full_node.client.execute_with(StartPoVRecovery {
para_id,
client: client.clone(),
import_queue,
task_manager,
overseer_handle: relay_chain_full_node
.overseer_handle
.clone()
task_manager
.spawn_essential_handle()
.spawn("cumulus-consensus", None, consensus);
let pov_recovery = cumulus_client_pov_recovery::PoVRecovery::new(
relay_chain_interface
.overseer_handle()
.ok_or_else(|| "Polkadot full node did not provide an `OverseerHandle`!")?,
_phantom: PhantomData,
})?;
slot_duration,
client.clone(),
import_queue,
relay_chain_interface.clone(),
para_id,
);
task_manager
.spawn_essential_handle()
.spawn("cumulus-pov-recovery", None, pov_recovery.run());
cumulus_client_collator::start_collator(cumulus_client_collator::StartCollatorParams {
runtime_api: client.clone(),
block_status,
announce_block,
overseer_handle: relay_chain_full_node
.overseer_handle
.clone()
overseer_handle: relay_chain_interface
.overseer_handle()
.ok_or_else(|| "Polkadot full node did not provide an `OverseerHandle`!")?,
spawner,
para_id,
key: relay_chain_full_node.collator_key.clone(),
key: collator_key,
parachain_consensus,
})
.await;
task_manager.add_child(relay_chain_full_node.relay_chain_full_node.task_manager);
Ok(())
}
/// Parameters given to [`start_full_node`].
pub struct StartFullNodeParams<'a, Block: BlockT, Client, PClient> {
pub struct StartFullNodeParams<'a, Block: BlockT, Client, RCInterface> {
pub para_id: ParaId,
pub client: Arc<Client>,
pub relay_chain_full_node: RFullNode<PClient>,
pub relay_chain_interface: RCInterface,
pub task_manager: &'a mut TaskManager,
pub announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
}
@@ -163,14 +152,14 @@ pub struct StartFullNodeParams<'a, Block: BlockT, Client, PClient> {
///
/// A full node will only sync the given parachain and will follow the
/// tip of the chain.
pub fn start_full_node<Block, Client, Backend, PClient>(
pub fn start_full_node<Block, Client, Backend, RCInterface>(
StartFullNodeParams {
client,
announce_block,
task_manager,
relay_chain_full_node,
relay_chain_interface,
para_id,
}: StartFullNodeParams<Block, Client, PClient>,
}: StartFullNodeParams<Block, Client, RCInterface>,
) -> sc_service::error::Result<()>
where
Block: BlockT,
@@ -183,116 +172,22 @@ where
+ 'static,
for<'a> &'a Client: BlockImport<Block>,
Backend: BackendT<Block> + 'static,
PClient: ClientHandle,
RCInterface: RelayChainInterface + Clone + 'static,
{
relay_chain_full_node.client.execute_with(StartConsensus {
announce_block,
let consensus = cumulus_client_consensus_common::run_parachain_consensus(
para_id,
client,
task_manager,
_phantom: PhantomData,
});
client.clone(),
relay_chain_interface.clone(),
announce_block,
);
task_manager.add_child(relay_chain_full_node.relay_chain_full_node.task_manager);
task_manager
.spawn_essential_handle()
.spawn("cumulus-consensus", None, consensus);
Ok(())
}
struct StartConsensus<'a, Block: BlockT, Client, Backend> {
para_id: ParaId,
announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
client: Arc<Client>,
task_manager: &'a mut TaskManager,
_phantom: PhantomData<Backend>,
}
impl<'a, Block, Client, Backend> polkadot_service::ExecuteWithClient
for StartConsensus<'a, Block, Client, Backend>
where
Block: BlockT,
Client: Finalizer<Block, Backend>
+ UsageProvider<Block>
+ Send
+ Sync
+ BlockBackend<Block>
+ BlockchainEvents<Block>
+ 'static,
for<'b> &'b Client: BlockImport<Block>,
Backend: BackendT<Block> + 'static,
{
type Output = ();
fn execute_with_client<PClient, Api, PBackend>(self, client: Arc<PClient>) -> Self::Output
where
<Api as sp_api::ApiExt<PBlock>>::StateBackend: sp_api::StateBackend<BlakeTwo256>,
PBackend: sc_client_api::Backend<PBlock>,
PBackend::State: sp_api::StateBackend<BlakeTwo256>,
Api: RuntimeApiCollection<StateBackend = PBackend::State>,
PClient: AbstractClient<PBlock, PBackend, Api = Api> + 'static,
{
let consensus = cumulus_client_consensus_common::run_parachain_consensus(
self.para_id,
self.client.clone(),
client.clone(),
self.announce_block,
);
self.task_manager
.spawn_essential_handle()
.spawn("cumulus-consensus", None, consensus);
}
}
struct StartPoVRecovery<'a, Block: BlockT, Client, IQ> {
para_id: ParaId,
client: Arc<Client>,
task_manager: &'a mut TaskManager,
overseer_handle: OverseerHandle,
import_queue: IQ,
_phantom: PhantomData<Block>,
}
impl<'a, Block, Client, IQ> polkadot_service::ExecuteWithClient
for StartPoVRecovery<'a, Block, Client, IQ>
where
Block: BlockT,
Client: UsageProvider<Block>
+ Send
+ Sync
+ BlockBackend<Block>
+ BlockchainEvents<Block>
+ 'static,
IQ: ImportQueue<Block> + 'static,
{
type Output = sc_service::error::Result<()>;
fn execute_with_client<PClient, Api, PBackend>(self, client: Arc<PClient>) -> Self::Output
where
<Api as sp_api::ApiExt<PBlock>>::StateBackend: sp_api::StateBackend<BlakeTwo256>,
PBackend: sc_client_api::Backend<PBlock>,
PBackend::State: sp_api::StateBackend<BlakeTwo256>,
Api: RuntimeApiCollection<StateBackend = PBackend::State>,
PClient: AbstractClient<PBlock, PBackend, Api = Api> + 'static,
{
let pov_recovery = cumulus_client_pov_recovery::PoVRecovery::new(
self.overseer_handle,
sc_consensus_babe::Config::get(&*client)?.slot_duration(),
self.client,
self.import_queue,
client,
self.para_id,
);
self.task_manager.spawn_essential_handle().spawn(
"cumulus-pov-recovery",
None,
pov_recovery.run(),
);
Ok(())
}
}
/// Prepare the parachain's node condifugration
///
/// This function will disable the default announcement of Substrate for the parachain in favor
@@ -303,32 +198,6 @@ pub fn prepare_node_config(mut parachain_config: Configuration) -> Configuration
parachain_config
}
/// Build the Polkadot full node using the given `config`.
#[sc_tracing::logging::prefix_logs_with("Relaychain")]
pub fn build_polkadot_full_node(
config: Configuration,
telemetry_worker_handle: Option<TelemetryWorkerHandle>,
) -> Result<RFullNode<PClient>, polkadot_service::Error> {
let is_light = matches!(config.role, Role::Light);
if is_light {
Err(polkadot_service::Error::Sub("Light client not supported.".into()))
} else {
let collator_key = CollatorPair::generate().0;
let relay_chain_full_node = polkadot_service::build_full(
config,
polkadot_service::IsCollator::Yes(collator_key.clone()),
None,
true,
None,
telemetry_worker_handle,
polkadot_service::RealOverseerGen,
)?;
Ok(RFullNode { relay_chain_full_node, collator_key })
}
}
/// A shared import queue
///
/// This is basically a hack until the Substrate side is implemented properly.
@@ -88,6 +88,8 @@ cumulus-client-network = { path = "../../client/network" }
cumulus-client-service = { path = "../../client/service" }
cumulus-primitives-core = { path = "../../primitives/core" }
cumulus-primitives-parachain-inherent = { path = "../../primitives/parachain-inherent" }
cumulus-relay-chain-interface = { path = "../../client/relay-chain-interface" }
cumulus-relay-chain-local = { path = "../../client/relay-chain-local" }
# Polkadot dependencies
polkadot-cli = { git = "https://github.com/paritytech/polkadot", branch = "master" }
+57 -71
View File
@@ -1,7 +1,7 @@
//! Service and ServiceFactory implementation. Specialized wrapper over substrate service.
// std
use std::sync::Arc;
use std::{sync::Arc, time::Duration};
// Local Runtime Types
use parachain_template_runtime::{
@@ -9,15 +9,15 @@ use parachain_template_runtime::{
};
// Cumulus Imports
use cumulus_client_consensus_aura::{
build_aura_consensus, BuildAuraConsensusParams, SlotProportion,
};
use cumulus_client_consensus_aura::{AuraConsensus, BuildAuraConsensusParams, SlotProportion};
use cumulus_client_consensus_common::ParachainConsensus;
use cumulus_client_network::build_block_announce_validator;
use cumulus_client_network::BlockAnnounceValidator;
use cumulus_client_service::{
prepare_node_config, start_collator, start_full_node, StartCollatorParams, StartFullNodeParams,
};
use cumulus_primitives_core::ParaId;
use cumulus_relay_chain_interface::RelayChainInterface;
use cumulus_relay_chain_local::build_relay_chain_interface;
// Substrate Imports
use sc_client_api::ExecutorProvider;
@@ -216,7 +216,7 @@ where
Option<&Registry>,
Option<TelemetryHandle>,
&TaskManager,
&polkadot_service::NewFull<polkadot_service::Client>,
Arc<dyn RelayChainInterface>,
Arc<
sc_transaction_pool::FullPool<
Block,
@@ -237,27 +237,23 @@ where
let params = new_partial::<RuntimeApi, Executor, BIQ>(&parachain_config, build_import_queue)?;
let (mut telemetry, telemetry_worker_handle) = params.other;
let relay_chain_full_node =
cumulus_client_service::build_polkadot_full_node(polkadot_config, telemetry_worker_handle)
let client = params.client.clone();
let backend = params.backend.clone();
let mut task_manager = params.task_manager;
let (relay_chain_interface, collator_key) =
build_relay_chain_interface(polkadot_config, telemetry_worker_handle, &mut task_manager)
.map_err(|e| match e {
polkadot_service::Error::Sub(x) => x,
s => format!("{}", s).into(),
})?;
let client = params.client.clone();
let backend = params.backend.clone();
let block_announce_validator = build_block_announce_validator(
relay_chain_full_node.client.clone(),
id,
Box::new(relay_chain_full_node.network.clone()),
relay_chain_full_node.backend.clone(),
);
let block_announce_validator = BlockAnnounceValidator::new(relay_chain_interface.clone(), id);
let force_authoring = parachain_config.force_authoring;
let validator = parachain_config.role.is_authority();
let prometheus_registry = parachain_config.prometheus_registry().cloned();
let transaction_pool = params.transaction_pool.clone();
let mut task_manager = params.task_manager;
let import_queue = cumulus_client_service::SharedImportQueue::new(params.import_queue);
let (network, system_rpc_tx, start_network) =
sc_service::build_network(sc_service::BuildNetworkParams {
@@ -266,7 +262,9 @@ where
transaction_pool: transaction_pool.clone(),
spawn_handle: task_manager.spawn_handle(),
import_queue: import_queue.clone(),
block_announce_validator_builder: Some(Box::new(|_| block_announce_validator)),
block_announce_validator_builder: Some(Box::new(|_| {
Box::new(block_announce_validator)
})),
warp_sync: None,
})?;
@@ -309,7 +307,7 @@ where
prometheus_registry.as_ref(),
telemetry.as_ref().map(|t| t.handle()),
&task_manager,
&relay_chain_full_node,
relay_chain_interface.clone(),
transaction_pool,
network,
params.keystore_container.sync_keystore(),
@@ -324,10 +322,12 @@ where
announce_block,
client: client.clone(),
task_manager: &mut task_manager,
relay_chain_full_node,
relay_chain_interface,
spawner,
parachain_consensus,
import_queue,
collator_key,
slot_duration: Duration::from_secs(6),
};
start_collator(params).await?;
@@ -337,7 +337,7 @@ where
announce_block,
task_manager: &mut task_manager,
para_id: id,
relay_chain_full_node,
relay_chain_interface,
};
start_full_node(params)?;
@@ -413,7 +413,7 @@ pub async fn start_parachain_node(
prometheus_registry,
telemetry,
task_manager,
relay_chain_node,
relay_chain_interface,
transaction_pool,
sync_oracle,
keystore,
@@ -428,62 +428,48 @@ pub async fn start_parachain_node(
telemetry.clone(),
);
let relay_chain_backend = relay_chain_node.backend.clone();
let relay_chain_client = relay_chain_node.client.clone();
Ok(build_aura_consensus::<
sp_consensus_aura::sr25519::AuthorityPair,
_,
_,
_,
_,
_,
_,
_,
_,
_,
>(BuildAuraConsensusParams {
proposer_factory,
create_inherent_data_providers: move |_, (relay_parent, validation_data)| {
let parachain_inherent =
cumulus_primitives_parachain_inherent::ParachainInherentData::create_at_with_client(
relay_parent,
&relay_chain_client,
&*relay_chain_backend,
&validation_data,
id,
);
async move {
let time = sp_timestamp::InherentDataProvider::from_system_time();
Ok(AuraConsensus::build::<sp_consensus_aura::sr25519::AuthorityPair, _, _, _, _, _, _>(
BuildAuraConsensusParams {
proposer_factory,
create_inherent_data_providers: move |_, (relay_parent, validation_data)| {
let parachain_inherent =
cumulus_primitives_parachain_inherent::ParachainInherentData::create_at(
relay_parent,
&relay_chain_interface,
&validation_data,
id,
);
async move {
let time = sp_timestamp::InherentDataProvider::from_system_time();
let slot =
let slot =
sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_duration(
*time,
slot_duration.slot_duration(),
);
let parachain_inherent = parachain_inherent.ok_or_else(|| {
Box::<dyn std::error::Error + Send + Sync>::from(
"Failed to create parachain inherent",
)
})?;
Ok((time, slot, parachain_inherent))
}
let parachain_inherent = parachain_inherent.ok_or_else(|| {
Box::<dyn std::error::Error + Send + Sync>::from(
"Failed to create parachain inherent",
)
})?;
Ok((time, slot, parachain_inherent))
}
},
block_import: client.clone(),
para_client: client,
backoff_authoring_blocks: Option::<()>::None,
sync_oracle,
keystore,
force_authoring,
slot_duration,
// We got around 500ms for proposing
block_proposal_slot_portion: SlotProportion::new(1f32 / 24f32),
// And a maximum of 750ms if slots are skipped
max_block_proposal_slot_portion: Some(SlotProportion::new(1f32 / 16f32)),
telemetry,
},
block_import: client.clone(),
relay_chain_client: relay_chain_node.client.clone(),
relay_chain_backend: relay_chain_node.backend.clone(),
para_client: client,
backoff_authoring_blocks: Option::<()>::None,
sync_oracle,
keystore,
force_authoring,
slot_duration,
// We got around 500ms for proposing
block_proposal_slot_portion: SlotProportion::new(1f32 / 24f32),
// And a maximum of 750ms if slots are skipped
max_block_proposal_slot_portion: Some(SlotProportion::new(1f32 / 16f32)),
telemetry,
}))
))
},
)
.await
+2
View File
@@ -76,6 +76,8 @@ cumulus-client-service = { path = "../client/service" }
cumulus-client-network = { path = "../client/network" }
cumulus-primitives-core = { path = "../primitives/core" }
cumulus-primitives-parachain-inherent = { path = "../primitives/parachain-inherent" }
cumulus-relay-chain-interface = { path = "../client/relay-chain-interface" }
cumulus-relay-chain-local = { path = "../client/relay-chain-local" }
# Polkadot dependencies
polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "master" }
+95 -124
View File
@@ -13,13 +13,12 @@
// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.
use cumulus_client_consensus_aura::{
build_aura_consensus, BuildAuraConsensusParams, SlotProportion,
};
use cumulus_client_consensus_aura::{AuraConsensus, BuildAuraConsensusParams, SlotProportion};
use cumulus_client_consensus_common::{
ParachainBlockImport, ParachainCandidate, ParachainConsensus,
};
use cumulus_client_network::build_block_announce_validator;
use cumulus_client_network::BlockAnnounceValidator;
use cumulus_client_service::{
prepare_node_config, start_collator, start_full_node, StartCollatorParams, StartFullNodeParams,
};
@@ -27,6 +26,8 @@ use cumulus_primitives_core::{
relay_chain::v1::{Hash as PHash, PersistedValidationData},
ParaId,
};
use cumulus_relay_chain_interface::RelayChainInterface;
use cumulus_relay_chain_local::build_relay_chain_interface;
use polkadot_service::NativeExecutionDispatch;
use crate::rpc;
@@ -51,7 +52,7 @@ use sp_runtime::{
generic::BlockId,
traits::{BlakeTwo256, Header as HeaderT},
};
use std::sync::Arc;
use std::{sync::Arc, time::Duration};
use substrate_prometheus_endpoint::Registry;
/// Native executor instance.
@@ -311,7 +312,7 @@ where
Option<&Registry>,
Option<TelemetryHandle>,
&TaskManager,
&polkadot_service::NewFull<polkadot_service::Client>,
Arc<dyn RelayChainInterface>,
Arc<
sc_transaction_pool::FullPool<
Block,
@@ -332,27 +333,24 @@ where
let params = new_partial::<RuntimeApi, Executor, BIQ>(&parachain_config, build_import_queue)?;
let (mut telemetry, telemetry_worker_handle) = params.other;
let relay_chain_full_node =
cumulus_client_service::build_polkadot_full_node(polkadot_config, telemetry_worker_handle)
let client = params.client.clone();
let backend = params.backend.clone();
let mut task_manager = params.task_manager;
let (relay_chain_interface, collator_key) =
build_relay_chain_interface(polkadot_config, telemetry_worker_handle, &mut task_manager)
.map_err(|e| match e {
polkadot_service::Error::Sub(x) => x,
s => format!("{}", s).into(),
})?;
let client = params.client.clone();
let backend = params.backend.clone();
let block_announce_validator = build_block_announce_validator(
relay_chain_full_node.client.clone(),
id,
Box::new(relay_chain_full_node.network.clone()),
relay_chain_full_node.backend.clone(),
);
let block_announce_validator = BlockAnnounceValidator::new(relay_chain_interface.clone(), id);
let force_authoring = parachain_config.force_authoring;
let validator = parachain_config.role.is_authority();
let prometheus_registry = parachain_config.prometheus_registry().cloned();
let transaction_pool = params.transaction_pool.clone();
let mut task_manager = params.task_manager;
let import_queue = cumulus_client_service::SharedImportQueue::new(params.import_queue);
let (network, system_rpc_tx, start_network) =
sc_service::build_network(sc_service::BuildNetworkParams {
@@ -361,7 +359,9 @@ where
transaction_pool: transaction_pool.clone(),
spawn_handle: task_manager.spawn_handle(),
import_queue: import_queue.clone(),
block_announce_validator_builder: Some(Box::new(|_| block_announce_validator)),
block_announce_validator_builder: Some(Box::new(|_| {
Box::new(block_announce_validator)
})),
warp_sync: None,
})?;
@@ -392,7 +392,7 @@ where
prometheus_registry.as_ref(),
telemetry.as_ref().map(|t| t.handle()),
&task_manager,
&relay_chain_full_node,
relay_chain_interface.clone(),
transaction_pool,
network,
params.keystore_container.sync_keystore(),
@@ -407,10 +407,12 @@ where
announce_block,
client: client.clone(),
task_manager: &mut task_manager,
relay_chain_full_node,
relay_chain_interface,
spawner,
parachain_consensus,
import_queue,
collator_key,
slot_duration: Duration::from_secs(6),
};
start_collator(params).await?;
@@ -420,7 +422,7 @@ where
announce_block,
task_manager: &mut task_manager,
para_id: id,
relay_chain_full_node,
relay_chain_interface,
};
start_full_node(params)?;
@@ -486,7 +488,7 @@ where
Option<&Registry>,
Option<TelemetryHandle>,
&TaskManager,
&polkadot_service::NewFull<polkadot_service::Client>,
Arc<dyn RelayChainInterface>,
Arc<
sc_transaction_pool::FullPool<
Block,
@@ -507,27 +509,23 @@ where
let params = new_partial::<RuntimeApi, Executor, BIQ>(&parachain_config, build_import_queue)?;
let (mut telemetry, telemetry_worker_handle) = params.other;
let relay_chain_full_node =
cumulus_client_service::build_polkadot_full_node(polkadot_config, telemetry_worker_handle)
let client = params.client.clone();
let backend = params.backend.clone();
let mut task_manager = params.task_manager;
let (relay_chain_interface, collator_key) =
build_relay_chain_interface(polkadot_config, telemetry_worker_handle, &mut task_manager)
.map_err(|e| match e {
polkadot_service::Error::Sub(x) => x,
s => format!("{}", s).into(),
})?;
let client = params.client.clone();
let backend = params.backend.clone();
let block_announce_validator = build_block_announce_validator(
relay_chain_full_node.client.clone(),
id,
Box::new(relay_chain_full_node.network.clone()),
relay_chain_full_node.backend.clone(),
);
let block_announce_validator = BlockAnnounceValidator::new(relay_chain_interface.clone(), id);
let force_authoring = parachain_config.force_authoring;
let validator = parachain_config.role.is_authority();
let prometheus_registry = parachain_config.prometheus_registry().cloned();
let transaction_pool = params.transaction_pool.clone();
let mut task_manager = params.task_manager;
let import_queue = cumulus_client_service::SharedImportQueue::new(params.import_queue);
let (network, system_rpc_tx, start_network) =
sc_service::build_network(sc_service::BuildNetworkParams {
@@ -536,7 +534,9 @@ where
transaction_pool: transaction_pool.clone(),
spawn_handle: task_manager.spawn_handle(),
import_queue: import_queue.clone(),
block_announce_validator_builder: Some(Box::new(|_| block_announce_validator)),
block_announce_validator_builder: Some(Box::new(|_| {
Box::new(block_announce_validator)
})),
warp_sync: None,
})?;
@@ -579,7 +579,7 @@ where
prometheus_registry.as_ref(),
telemetry.as_ref().map(|t| t.handle()),
&task_manager,
&relay_chain_full_node,
relay_chain_interface.clone(),
transaction_pool,
network,
params.keystore_container.sync_keystore(),
@@ -594,10 +594,12 @@ where
announce_block,
client: client.clone(),
task_manager: &mut task_manager,
relay_chain_full_node,
relay_chain_interface: relay_chain_interface.clone(),
spawner,
parachain_consensus,
import_queue,
collator_key,
slot_duration: Duration::from_secs(6),
};
start_collator(params).await?;
@@ -607,7 +609,7 @@ where
announce_block,
task_manager: &mut task_manager,
para_id: id,
relay_chain_full_node,
relay_chain_interface,
};
start_full_node(params)?;
@@ -698,7 +700,7 @@ pub async fn start_rococo_parachain_node(
prometheus_registry,
telemetry,
task_manager,
relay_chain_node,
relay_chain_interface,
transaction_pool,
sync_oracle,
keystore,
@@ -713,9 +715,8 @@ pub async fn start_rococo_parachain_node(
telemetry.clone(),
);
let relay_chain_backend = relay_chain_node.backend.clone();
let relay_chain_client = relay_chain_node.client.clone();
Ok(build_aura_consensus::<
Ok(AuraConsensus::build::<
sp_consensus_aura::sr25519::AuthorityPair,
_,
_,
@@ -723,17 +724,13 @@ pub async fn start_rococo_parachain_node(
_,
_,
_,
_,
_,
_,
>(BuildAuraConsensusParams {
proposer_factory,
create_inherent_data_providers: move |_, (relay_parent, validation_data)| {
let parachain_inherent =
cumulus_primitives_parachain_inherent::ParachainInherentData::create_at_with_client(
cumulus_primitives_parachain_inherent::ParachainInherentData::create_at(
relay_parent,
&relay_chain_client,
&*relay_chain_backend,
&relay_chain_interface,
&validation_data,
id,
);
@@ -755,8 +752,6 @@ pub async fn start_rococo_parachain_node(
}
},
block_import: client.clone(),
relay_chain_client: relay_chain_node.client.clone(),
relay_chain_backend: relay_chain_node.backend.clone(),
para_client: client.clone(),
backoff_authoring_blocks: Option::<()>::None,
sync_oracle,
@@ -849,7 +844,7 @@ where
prometheus_registry,
telemetry,
task_manager,
relay_chain_node,
relay_chain_interface,
transaction_pool,
_,
_,
@@ -862,25 +857,20 @@ where
telemetry.clone(),
);
let relay_chain_backend = relay_chain_node.backend.clone();
let relay_chain_client = relay_chain_node.client.clone();
Ok(cumulus_client_consensus_relay_chain::build_relay_chain_consensus(
cumulus_client_consensus_relay_chain::BuildRelayChainConsensusParams {
para_id: id,
proposer_factory,
block_import: client.clone(),
relay_chain_client: relay_chain_node.client.clone(),
relay_chain_backend: relay_chain_node.backend.clone(),
relay_chain_interface: relay_chain_interface.clone(),
create_inherent_data_providers: move |_, (relay_parent, validation_data)| {
let parachain_inherent =
cumulus_primitives_parachain_inherent::ParachainInherentData::create_at_with_client(
relay_parent,
&relay_chain_client,
&*relay_chain_backend,
&validation_data,
id,
);
cumulus_primitives_parachain_inherent::ParachainInherentData::create_at(
relay_parent,
&relay_chain_interface,
&validation_data,
id,
);
async move {
let parachain_inherent = parachain_inherent.ok_or_else(|| {
Box::<dyn std::error::Error + Send + Sync>::from(
@@ -1119,19 +1109,17 @@ where
prometheus_registry,
telemetry,
task_manager,
relay_chain_node,
relay_chain_interface,
transaction_pool,
sync_oracle,
keystore,
force_authoring| {
let client2 = client.clone();
let relay_chain_backend = relay_chain_node.backend.clone();
let relay_chain_client = relay_chain_node.client.clone();
let spawn_handle = task_manager.spawn_handle();
let transaction_pool2 = transaction_pool.clone();
let telemetry2 = telemetry.clone();
let prometheus_registry2 = prometheus_registry.map(|r| (*r).clone());
let relay_chain_for_aura = relay_chain_interface.clone();
let aura_consensus = BuildOnAccess::Uninitialized(Some(Box::new(move || {
let slot_duration =
cumulus_client_consensus_aura::slot_duration(&*client2).unwrap();
@@ -1144,63 +1132,51 @@ where
telemetry2.clone(),
);
let relay_chain_backend2 = relay_chain_backend.clone();
let relay_chain_client2 = relay_chain_client.clone();
AuraConsensus::build::<sp_consensus_aura::sr25519::AuthorityPair, _, _, _, _, _, _>(
BuildAuraConsensusParams {
proposer_factory,
create_inherent_data_providers:
move |_, (relay_parent, validation_data)| {
let parachain_inherent =
cumulus_primitives_parachain_inherent::ParachainInherentData::create_at(
relay_parent,
&relay_chain_for_aura,
&validation_data,
id,
);
async move {
let time =
sp_timestamp::InherentDataProvider::from_system_time();
build_aura_consensus::<
sp_consensus_aura::sr25519::AuthorityPair,
_,
_,
_,
_,
_,
_,
_,
_,
_,
>(BuildAuraConsensusParams {
proposer_factory,
create_inherent_data_providers: move |_, (relay_parent, validation_data)| {
let parachain_inherent =
cumulus_primitives_parachain_inherent::ParachainInherentData::create_at_with_client(
relay_parent,
&relay_chain_client,
&*relay_chain_backend,
&validation_data,
id,
);
async move {
let time = sp_timestamp::InherentDataProvider::from_system_time();
let slot =
let slot =
sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_duration(
*time,
slot_duration.slot_duration(),
);
let parachain_inherent = parachain_inherent.ok_or_else(|| {
Box::<dyn std::error::Error + Send + Sync>::from(
"Failed to create parachain inherent",
)
})?;
Ok((time, slot, parachain_inherent))
}
let parachain_inherent =
parachain_inherent.ok_or_else(|| {
Box::<dyn std::error::Error + Send + Sync>::from(
"Failed to create parachain inherent",
)
})?;
Ok((time, slot, parachain_inherent))
}
},
block_import: client2.clone(),
para_client: client2.clone(),
backoff_authoring_blocks: Option::<()>::None,
sync_oracle,
keystore,
force_authoring,
slot_duration,
// We got around 500ms for proposing
block_proposal_slot_portion: SlotProportion::new(1f32 / 24f32),
// And a maximum of 750ms if slots are skipped
max_block_proposal_slot_portion: Some(SlotProportion::new(1f32 / 16f32)),
telemetry: telemetry2,
},
block_import: client2.clone(),
relay_chain_client: relay_chain_client2,
relay_chain_backend: relay_chain_backend2,
para_client: client2.clone(),
backoff_authoring_blocks: Option::<()>::None,
sync_oracle,
keystore,
force_authoring,
slot_duration,
// We got around 500ms for proposing
block_proposal_slot_portion: SlotProportion::new(1f32 / 24f32),
// And a maximum of 750ms if slots are skipped
max_block_proposal_slot_portion: Some(SlotProportion::new(1f32 / 16f32)),
telemetry: telemetry2,
})
)
})));
let proposer_factory = sc_basic_authorship::ProposerFactory::with_proof_recording(
@@ -1211,24 +1187,19 @@ where
telemetry.clone(),
);
let relay_chain_backend = relay_chain_node.backend.clone();
let relay_chain_client = relay_chain_node.client.clone();
let relay_chain_consensus =
cumulus_client_consensus_relay_chain::build_relay_chain_consensus(
cumulus_client_consensus_relay_chain::BuildRelayChainConsensusParams {
para_id: id,
proposer_factory,
block_import: client.clone(),
relay_chain_client: relay_chain_node.client.clone(),
relay_chain_backend: relay_chain_node.backend.clone(),
relay_chain_interface: relay_chain_interface.clone(),
create_inherent_data_providers:
move |_, (relay_parent, validation_data)| {
let parachain_inherent =
cumulus_primitives_parachain_inherent::ParachainInherentData::create_at_with_client(
cumulus_primitives_parachain_inherent::ParachainInherentData::create_at(
relay_parent,
&relay_chain_client,
&*relay_chain_backend,
&relay_chain_interface,
&validation_data,
id,
);
@@ -15,12 +15,10 @@ sp-state-machine = { git = "https://github.com/paritytech/substrate", optional =
sp-trie = { git = "https://github.com/paritytech/substrate", default-features = false, branch = "master" }
sp-api = { git = "https://github.com/paritytech/substrate", optional = true , branch = "master" }
# Polkadot dependencies
polkadot-client = { git = "https://github.com/paritytech/polkadot", optional = true, branch = "master" }
# Cumulus dependencies
cumulus-primitives-core = { path = "../core", default-features = false }
cumulus-test-relay-sproof-builder = { path = "../../test/relay-sproof-builder", optional = true }
cumulus-relay-chain-interface = { path = "../../client/relay-chain-interface", optional = true }
# Other dependencies
async-trait = { version = "0.1.42", optional = true }
@@ -44,6 +42,6 @@ std = [
"sp-runtime",
"sc-client-api",
"sp-api",
"polkadot-client",
"cumulus-relay-chain-interface",
"cumulus-test-relay-sproof-builder"
]
@@ -19,112 +19,35 @@
use crate::ParachainInherentData;
use codec::Decode;
use cumulus_primitives_core::{
relay_chain::{
self,
v1::{HrmpChannelId, ParachainHost},
Block as PBlock, Hash as PHash,
},
InboundDownwardMessage, InboundHrmpMessage, ParaId, PersistedValidationData,
relay_chain::{self, v1::HrmpChannelId, Hash as PHash},
ParaId, PersistedValidationData,
};
use polkadot_client::{Client, ClientHandle, ExecuteWithClient};
use sc_client_api::Backend;
use sp_api::ProvideRuntimeApi;
use cumulus_relay_chain_interface::RelayChainInterface;
use sp_runtime::generic::BlockId;
use sp_state_machine::Backend as _;
use std::collections::BTreeMap;
const LOG_TARGET: &str = "parachain-inherent";
/// Returns the whole contents of the downward message queue for the parachain we are collating
/// for.
///
/// Returns `None` in case of an error.
fn retrieve_dmq_contents<PClient>(
polkadot_client: &PClient,
para_id: ParaId,
relay_parent: PHash,
) -> Option<Vec<InboundDownwardMessage>>
where
PClient: ProvideRuntimeApi<PBlock>,
PClient::Api: ParachainHost<PBlock>,
{
polkadot_client
.runtime_api()
.dmq_contents_with_context(
&BlockId::hash(relay_parent),
sp_core::ExecutionContext::Importing,
para_id,
)
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
relay_parent = ?relay_parent,
error = ?e,
"An error occured during requesting the downward messages.",
);
})
.ok()
}
/// Returns channels contents for each inbound HRMP channel addressed to the parachain we are
/// collating for.
///
/// Empty channels are also included.
fn retrieve_all_inbound_hrmp_channel_contents<PClient>(
polkadot_client: &PClient,
para_id: ParaId,
relay_parent: PHash,
) -> Option<BTreeMap<ParaId, Vec<InboundHrmpMessage>>>
where
PClient: ProvideRuntimeApi<PBlock>,
PClient::Api: ParachainHost<PBlock>,
{
polkadot_client
.runtime_api()
.inbound_hrmp_channels_contents_with_context(
&BlockId::hash(relay_parent),
sp_core::ExecutionContext::Importing,
para_id,
)
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
relay_parent = ?relay_parent,
error = ?e,
"An error occured during requesting the inbound HRMP messages.",
);
})
.ok()
}
/// Collect the relevant relay chain state in form of a proof for putting it into the validation
/// data inherent.
fn collect_relay_storage_proof(
polkadot_backend: &impl Backend<PBlock>,
relay_chain_interface: &impl RelayChainInterface,
para_id: ParaId,
relay_parent: PHash,
) -> Option<sp_state_machine::StorageProof> {
use relay_chain::well_known_keys as relay_well_known_keys;
let relay_parent_state_backend = polkadot_backend
.state_at(BlockId::Hash(relay_parent))
let relay_parent_block_id = BlockId::Hash(relay_parent);
let ingress_channels = relay_chain_interface
.get_storage_by_key(
&relay_parent_block_id,
&relay_well_known_keys::hrmp_ingress_channel_index(para_id),
)
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
relay_parent = ?relay_parent,
error = ?e,
"Cannot obtain the state of the relay chain.",
)
})
.ok()?;
let ingress_channels = relay_parent_state_backend
.storage(&relay_well_known_keys::hrmp_ingress_channel_index(para_id))
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
error = ?e,
"Cannot obtain the hrmp ingress channel index."
"Cannot obtain the hrmp ingress channel."
)
})
.ok()?;
@@ -142,16 +65,20 @@ fn collect_relay_storage_proof(
.ok()?
.unwrap_or_default();
let egress_channels = relay_parent_state_backend
.storage(&relay_well_known_keys::hrmp_egress_channel_index(para_id))
let egress_channels = relay_chain_interface
.get_storage_by_key(
&relay_parent_block_id,
&relay_well_known_keys::hrmp_egress_channel_index(para_id),
)
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
error = ?e,
"Cannot obtain the hrmp egress channel index.",
"Cannot obtain the hrmp egress channel.",
)
})
.ok()?;
let egress_channels = egress_channels
.map(|raw| <Vec<ParaId>>::decode(&mut &raw[..]))
.transpose()
@@ -181,38 +108,26 @@ fn collect_relay_storage_proof(
relay_well_known_keys::hrmp_channels(HrmpChannelId { sender: para_id, recipient })
}));
sp_state_machine::prove_read(relay_parent_state_backend, relevant_keys)
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
relay_parent = ?relay_parent,
error = ?e,
"Failed to collect required relay chain state storage proof.",
)
})
.ok()
relay_chain_interface.prove_read(&relay_parent_block_id, &relevant_keys).ok()?
}
impl ParachainInherentData {
/// Create the [`ParachainInherentData`] at the given `relay_parent`.
///
/// Returns `None` if the creation failed.
pub fn create_at<PClient>(
pub fn create_at(
relay_parent: PHash,
polkadot_client: &PClient,
polkadot_backend: &impl Backend<PBlock>,
relay_chain_interface: &impl RelayChainInterface,
validation_data: &PersistedValidationData,
para_id: ParaId,
) -> Option<ParachainInherentData>
where
PClient: ProvideRuntimeApi<PBlock>,
PClient::Api: ParachainHost<PBlock>,
{
) -> Option<ParachainInherentData> {
let relay_chain_state =
collect_relay_storage_proof(polkadot_backend, para_id, relay_parent)?;
let downward_messages = retrieve_dmq_contents(polkadot_client, para_id, relay_parent)?;
let horizontal_messages =
retrieve_all_inbound_hrmp_channel_contents(polkadot_client, para_id, relay_parent)?;
collect_relay_storage_proof(relay_chain_interface, para_id, relay_parent)?;
let downward_messages =
relay_chain_interface.retrieve_dmq_contents(para_id, relay_parent)?;
let horizontal_messages = relay_chain_interface
.retrieve_all_inbound_hrmp_channel_contents(para_id, relay_parent)?;
Some(ParachainInherentData {
downward_messages,
@@ -221,24 +136,6 @@ impl ParachainInherentData {
relay_chain_state,
})
}
/// Create the [`ParachainInherentData`] at the given `relay_parent`.
///
/// Returns `None` if the creation failed.
pub fn create_at_with_client(
relay_parent: PHash,
polkadot_client: &Client,
relay_chain_backend: &impl Backend<PBlock>,
validation_data: &PersistedValidationData,
para_id: ParaId,
) -> Option<ParachainInherentData> {
polkadot_client.execute_with(CreateAtWithClient {
relay_chain_backend,
validation_data,
para_id,
relay_parent,
})
}
}
#[async_trait::async_trait]
@@ -258,35 +155,3 @@ impl sp_inherents::InherentDataProvider for ParachainInherentData {
None
}
}
/// Special structure to run [`ParachainInherentData::create_at`] with a [`Client`].
struct CreateAtWithClient<'a, B> {
relay_parent: PHash,
relay_chain_backend: &'a B,
validation_data: &'a PersistedValidationData,
para_id: ParaId,
}
impl<'a, B> ExecuteWithClient for CreateAtWithClient<'a, B>
where
B: Backend<PBlock>,
{
type Output = Option<ParachainInherentData>;
fn execute_with_client<Client, Api, Backend>(
self,
client: std::sync::Arc<Client>,
) -> Self::Output
where
Client: ProvideRuntimeApi<PBlock>,
Client::Api: ParachainHost<PBlock>,
{
ParachainInherentData::create_at(
self.relay_parent,
&*client,
self.relay_chain_backend,
self.validation_data,
self.para_id,
)
}
}
+3
View File
@@ -52,9 +52,12 @@ cumulus-primitives-core = { path = "../../primitives/core" }
cumulus-primitives-parachain-inherent = { path = "../../primitives/parachain-inherent" }
cumulus-test-runtime = { path = "../runtime" }
cumulus-test-relay-validation-worker-provider = { path = "../relay-validation-worker-provider" }
cumulus-relay-chain-local = { path = "../../client/relay-chain-local" }
criterion = { version = "0.3.5", features = [ "async_tokio" ] }
parking_lot = "0.11.1"
# RPC related dependencies
jsonrpc-core = "18.0.0"
+22 -28
View File
@@ -21,15 +21,19 @@
mod chain_spec;
mod genesis;
use core::future::Future;
use std::{future::Future, time::Duration};
use cumulus_client_consensus_common::{ParachainCandidate, ParachainConsensus};
use cumulus_client_network::BlockAnnounceValidator;
use cumulus_client_service::{
prepare_node_config, start_collator, start_full_node, StartCollatorParams, StartFullNodeParams,
};
use cumulus_primitives_core::ParaId;
use cumulus_relay_chain_local::RelayChainLocal;
use cumulus_test_runtime::{Hash, Header, NodeBlock as Block, RuntimeApi};
use frame_system_rpc_runtime_api::AccountNonceApi;
use parking_lot::Mutex;
use polkadot_primitives::v1::{CollatorPair, Hash as PHash, PersistedValidationData};
use polkadot_service::ProvideRuntimeApi;
use sc_client_api::execution_extensions::ExecutionStrategies;
@@ -214,13 +218,17 @@ where
let client = params.client.clone();
let backend = params.backend.clone();
let block_announce_validator = BlockAnnounceValidator::new(
let relay_chain_interface = Arc::new(RelayChainLocal::new(
relay_chain_full_node.client.clone(),
para_id,
Box::new(relay_chain_full_node.network.clone()),
relay_chain_full_node.backend.clone(),
relay_chain_full_node.client.clone(),
);
Arc::new(Mutex::new(Box::new(relay_chain_full_node.network.clone()))),
relay_chain_full_node.overseer_handle.clone(),
));
task_manager.add_child(relay_chain_full_node.task_manager);
let block_announce_validator =
BlockAnnounceValidator::new(relay_chain_interface.clone(), para_id);
let block_announce_validator_builder = move |_| Box::new(block_announce_validator) as Box<_>;
let prometheus_registry = parachain_config.prometheus_registry().cloned();
@@ -264,6 +272,7 @@ where
.map(|w| (w)(announce_block.clone()))
.unwrap_or_else(|| announce_block);
let relay_chain_interface_for_closure = relay_chain_interface.clone();
if let Some(collator_key) = collator_key {
let parachain_consensus: Box<dyn ParachainConsensus<Block>> = match consensus {
Consensus::RelayChain => {
@@ -274,10 +283,7 @@ where
prometheus_registry.as_ref(),
None,
);
let relay_chain_client = relay_chain_full_node.client.clone();
let relay_chain_backend = relay_chain_full_node.backend.clone();
let relay_chain_interface2 = relay_chain_interface_for_closure.clone();
Box::new(cumulus_client_consensus_relay_chain::RelayChainConsensus::new(
para_id,
proposer_factory,
@@ -285,8 +291,7 @@ where
let parachain_inherent =
cumulus_primitives_parachain_inherent::ParachainInherentData::create_at(
relay_parent,
&*relay_chain_client,
&*relay_chain_backend,
&relay_chain_interface_for_closure,
&validation_data,
para_id,
);
@@ -303,16 +308,12 @@ where
}
},
client.clone(),
relay_chain_full_node.client.clone(),
relay_chain_full_node.backend.clone(),
relay_chain_interface2,
))
},
Consensus::Null => Box::new(NullConsensus),
};
let relay_chain_full_node =
relay_chain_full_node.with_client(polkadot_test_service::TestClient);
let params = StartCollatorParams {
block_status: client.clone(),
announce_block,
@@ -321,27 +322,20 @@ where
task_manager: &mut task_manager,
para_id,
parachain_consensus,
relay_chain_full_node: cumulus_client_service::RFullNode {
relay_chain_full_node,
collator_key,
},
relay_chain_interface,
collator_key,
import_queue,
slot_duration: Duration::from_secs(6),
};
start_collator(params).await?;
} else {
let relay_chain_full_node =
relay_chain_full_node.with_client(polkadot_test_service::TestClient);
let params = StartFullNodeParams {
client: client.clone(),
announce_block,
task_manager: &mut task_manager,
para_id,
relay_chain_full_node: cumulus_client_service::RFullNode {
relay_chain_full_node,
collator_key: CollatorPair::generate().0,
},
relay_chain_interface,
};
start_full_node(params)?;