mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-31 07:31:02 +00:00
Fix multi collator setup (#133)
* Start * Fix compilation * Fix chainspec * Don't set best 2 times for the same block * Check the status of a block before building on it * Check that the block exists before setting it as the new best * Reorder code * Fork choice depends on sync status * Switch branch again
This commit is contained in:
Generated
+161
-150
File diff suppressed because it is too large
Load Diff
+112
-37
@@ -29,16 +29,19 @@ use cumulus_primitives::{
|
|||||||
};
|
};
|
||||||
use cumulus_runtime::ParachainBlockData;
|
use cumulus_runtime::ParachainBlockData;
|
||||||
|
|
||||||
use sc_client_api::{BlockchainEvents, Finalizer, StateBackend, UsageProvider};
|
use sc_client_api::{BlockBackend, BlockchainEvents, Finalizer, StateBackend, UsageProvider};
|
||||||
use sc_service::Configuration;
|
use sc_service::Configuration;
|
||||||
use sp_api::{ApiExt, ProvideRuntimeApi};
|
use sp_api::{ApiExt, ProvideRuntimeApi};
|
||||||
use sp_blockchain::HeaderBackend;
|
use sp_blockchain::HeaderBackend;
|
||||||
use sp_consensus::{
|
use sp_consensus::{
|
||||||
BlockImport, BlockImportParams, BlockOrigin, Environment, Error as ConsensusError,
|
BlockImport, BlockImportParams, BlockOrigin, BlockStatus, Environment, Error as ConsensusError,
|
||||||
ForkChoiceStrategy, Proposal, Proposer, RecordProof,
|
ForkChoiceStrategy, Proposal, Proposer, RecordProof,
|
||||||
};
|
};
|
||||||
use sp_inherents::{InherentData, InherentDataProviders};
|
use sp_inherents::{InherentData, InherentDataProviders};
|
||||||
use sp_runtime::traits::{Block as BlockT, HashFor, Header as HeaderT};
|
use sp_runtime::{
|
||||||
|
generic::BlockId,
|
||||||
|
traits::{Block as BlockT, HashFor, Header as HeaderT},
|
||||||
|
};
|
||||||
|
|
||||||
use polkadot_collator::{
|
use polkadot_collator::{
|
||||||
BuildParachainContext, Network as CollatorNetwork, ParachainContext, RuntimeApiCollection,
|
BuildParachainContext, Network as CollatorNetwork, ParachainContext, RuntimeApiCollection,
|
||||||
@@ -50,7 +53,7 @@ use polkadot_primitives::{
|
|||||||
|
|
||||||
use codec::{Decode, Encode};
|
use codec::{Decode, Encode};
|
||||||
|
|
||||||
use log::{error, trace};
|
use log::{debug, error, trace};
|
||||||
|
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
use futures::task::Spawn;
|
use futures::task::Spawn;
|
||||||
@@ -60,22 +63,24 @@ use std::{marker::PhantomData, pin::Pin, sync::Arc, time::Duration};
|
|||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
|
|
||||||
/// The implementation of the Cumulus `Collator`.
|
/// The implementation of the Cumulus `Collator`.
|
||||||
pub struct Collator<Block: BlockT, PF, BI> {
|
pub struct Collator<Block: BlockT, PF, BI, BS> {
|
||||||
proposer_factory: Arc<Mutex<PF>>,
|
proposer_factory: Arc<Mutex<PF>>,
|
||||||
_phantom: PhantomData<Block>,
|
_phantom: PhantomData<Block>,
|
||||||
inherent_data_providers: InherentDataProviders,
|
inherent_data_providers: InherentDataProviders,
|
||||||
collator_network: Arc<dyn CollatorNetwork>,
|
collator_network: Arc<dyn CollatorNetwork>,
|
||||||
block_import: Arc<Mutex<BI>>,
|
block_import: Arc<Mutex<BI>>,
|
||||||
|
block_status: Arc<BS>,
|
||||||
wait_to_announce: Arc<Mutex<WaitToAnnounce<Block>>>,
|
wait_to_announce: Arc<Mutex<WaitToAnnounce<Block>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Block: BlockT, PF, BI> Collator<Block, PF, BI> {
|
impl<Block: BlockT, PF, BI, BS> Collator<Block, PF, BI, BS> {
|
||||||
/// Create a new instance.
|
/// Create a new instance.
|
||||||
fn new(
|
fn new(
|
||||||
proposer_factory: PF,
|
proposer_factory: PF,
|
||||||
inherent_data_providers: InherentDataProviders,
|
inherent_data_providers: InherentDataProviders,
|
||||||
collator_network: impl CollatorNetwork + Clone + 'static,
|
collator_network: impl CollatorNetwork + Clone + 'static,
|
||||||
block_import: BI,
|
block_import: BI,
|
||||||
|
block_status: Arc<BS>,
|
||||||
spawner: Arc<dyn Spawn + Send + Sync>,
|
spawner: Arc<dyn Spawn + Send + Sync>,
|
||||||
announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>,
|
announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
@@ -92,6 +97,7 @@ impl<Block: BlockT, PF, BI> Collator<Block, PF, BI> {
|
|||||||
_phantom: PhantomData,
|
_phantom: PhantomData,
|
||||||
collator_network,
|
collator_network,
|
||||||
block_import: Arc::new(Mutex::new(block_import)),
|
block_import: Arc::new(Mutex::new(block_import)),
|
||||||
|
block_status,
|
||||||
wait_to_announce,
|
wait_to_announce,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -110,7 +116,7 @@ impl<Block: BlockT, PF, BI> Collator<Block, PF, BI> {
|
|||||||
target: "cumulus-collator",
|
target: "cumulus-collator",
|
||||||
"Failed to create inherent data: {:?}",
|
"Failed to create inherent data: {:?}",
|
||||||
e,
|
e,
|
||||||
);
|
)
|
||||||
})
|
})
|
||||||
.ok()?;
|
.ok()?;
|
||||||
|
|
||||||
@@ -124,7 +130,7 @@ impl<Block: BlockT, PF, BI> Collator<Block, PF, BI> {
|
|||||||
target: "cumulus-collator",
|
target: "cumulus-collator",
|
||||||
"Failed to put validation function params into inherent data: {:?}",
|
"Failed to put validation function params into inherent data: {:?}",
|
||||||
e,
|
e,
|
||||||
);
|
)
|
||||||
})
|
})
|
||||||
.ok()?;
|
.ok()?;
|
||||||
|
|
||||||
@@ -135,7 +141,7 @@ impl<Block: BlockT, PF, BI> Collator<Block, PF, BI> {
|
|||||||
target: "cumulus-collator",
|
target: "cumulus-collator",
|
||||||
"Failed to put downward messages into inherent data: {:?}",
|
"Failed to put downward messages into inherent data: {:?}",
|
||||||
e,
|
e,
|
||||||
);
|
)
|
||||||
})
|
})
|
||||||
.ok()?;
|
.ok()?;
|
||||||
|
|
||||||
@@ -143,7 +149,7 @@ impl<Block: BlockT, PF, BI> Collator<Block, PF, BI> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Block: BlockT, PF, BI> Clone for Collator<Block, PF, BI> {
|
impl<Block: BlockT, PF, BI, BS> Clone for Collator<Block, PF, BI, BS> {
|
||||||
fn clone(&self) -> Self {
|
fn clone(&self) -> Self {
|
||||||
Self {
|
Self {
|
||||||
proposer_factory: self.proposer_factory.clone(),
|
proposer_factory: self.proposer_factory.clone(),
|
||||||
@@ -151,12 +157,13 @@ impl<Block: BlockT, PF, BI> Clone for Collator<Block, PF, BI> {
|
|||||||
_phantom: PhantomData,
|
_phantom: PhantomData,
|
||||||
collator_network: self.collator_network.clone(),
|
collator_network: self.collator_network.clone(),
|
||||||
block_import: self.block_import.clone(),
|
block_import: self.block_import.clone(),
|
||||||
|
block_status: self.block_status.clone(),
|
||||||
wait_to_announce: self.wait_to_announce.clone(),
|
wait_to_announce: self.wait_to_announce.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Block, PF, BI> ParachainContext for Collator<Block, PF, BI>
|
impl<Block, PF, BI, BS> Collator<Block, PF, BI, BS>
|
||||||
where
|
where
|
||||||
Block: BlockT,
|
Block: BlockT,
|
||||||
PF: Environment<Block> + 'static + Send,
|
PF: Environment<Block> + 'static + Send,
|
||||||
@@ -168,6 +175,63 @@ where
|
|||||||
> + Send
|
> + Send
|
||||||
+ Sync
|
+ Sync
|
||||||
+ 'static,
|
+ 'static,
|
||||||
|
BS: BlockBackend<Block>,
|
||||||
|
{
|
||||||
|
/// Checks the status of the given block hash in the Parachain.
|
||||||
|
///
|
||||||
|
/// Returns `true` if the block could be found and is good to be build on.
|
||||||
|
fn check_block_status(&self, hash: Block::Hash) -> bool {
|
||||||
|
match self.block_status.block_status(&BlockId::Hash(hash)) {
|
||||||
|
Ok(BlockStatus::Queued) => {
|
||||||
|
debug!(
|
||||||
|
target: "cumulus-collator",
|
||||||
|
"Skipping candidate production, because block `{:?}` is still queued for import.", hash,
|
||||||
|
);
|
||||||
|
false
|
||||||
|
}
|
||||||
|
Ok(BlockStatus::InChainWithState) => true,
|
||||||
|
Ok(BlockStatus::InChainPruned) => {
|
||||||
|
error!(
|
||||||
|
target: "cumulus-collator",
|
||||||
|
"Skipping candidate production, because block `{:?}` is already pruned!", hash,
|
||||||
|
);
|
||||||
|
false
|
||||||
|
}
|
||||||
|
Ok(BlockStatus::KnownBad) => {
|
||||||
|
error!(
|
||||||
|
target: "cumulus-collator",
|
||||||
|
"Block `{}` is tagged as known bad and is included in the relay chain! Skipping candidate production!", hash,
|
||||||
|
);
|
||||||
|
false
|
||||||
|
}
|
||||||
|
Ok(BlockStatus::Unknown) => {
|
||||||
|
debug!(
|
||||||
|
target: "cumulus-collator",
|
||||||
|
"Skipping candidate production, because block `{:?}` is unknown.", hash,
|
||||||
|
);
|
||||||
|
false
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!(target: "cumulus-collator", "Failed to get block status of `{:?}`: {:?}", hash, e);
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Block, PF, BI, BS> ParachainContext for Collator<Block, PF, BI, BS>
|
||||||
|
where
|
||||||
|
Block: BlockT,
|
||||||
|
PF: Environment<Block> + 'static + Send,
|
||||||
|
PF::Proposer: Send,
|
||||||
|
BI: BlockImport<
|
||||||
|
Block,
|
||||||
|
Error = ConsensusError,
|
||||||
|
Transaction = <PF::Proposer as Proposer<Block>>::Transaction,
|
||||||
|
> + Send
|
||||||
|
+ Sync
|
||||||
|
+ 'static,
|
||||||
|
BS: BlockBackend<Block>,
|
||||||
{
|
{
|
||||||
type ProduceCandidate =
|
type ProduceCandidate =
|
||||||
Pin<Box<dyn Future<Output = Option<(BlockData, parachain::HeadData)>> + Send>>;
|
Pin<Box<dyn Future<Output = Option<(BlockData, parachain::HeadData)>> + Send>>;
|
||||||
@@ -193,6 +257,10 @@ where
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
if !self.check_block_status(last_head.header.hash()) {
|
||||||
|
return future::ready(None).boxed();
|
||||||
|
}
|
||||||
|
|
||||||
let proposer_future = factory.lock().init(&last_head.header);
|
let proposer_future = factory.lock().init(&last_head.header);
|
||||||
|
|
||||||
let wait_to_announce = self.wait_to_announce.clone();
|
let wait_to_announce = self.wait_to_announce.clone();
|
||||||
@@ -205,7 +273,7 @@ where
|
|||||||
target: "cumulus-collator",
|
target: "cumulus-collator",
|
||||||
"Could not create proposer: {:?}",
|
"Could not create proposer: {:?}",
|
||||||
e,
|
e,
|
||||||
);
|
)
|
||||||
})
|
})
|
||||||
.ok()?;
|
.ok()?;
|
||||||
|
|
||||||
@@ -225,7 +293,7 @@ where
|
|||||||
inherent_data,
|
inherent_data,
|
||||||
Default::default(),
|
Default::default(),
|
||||||
//TODO: Fix this.
|
//TODO: Fix this.
|
||||||
Duration::from_secs(6),
|
Duration::from_millis(500),
|
||||||
RecordProof::Yes,
|
RecordProof::Yes,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
@@ -234,7 +302,7 @@ where
|
|||||||
target: "cumulus-collator",
|
target: "cumulus-collator",
|
||||||
"Proposing failed: {:?}",
|
"Proposing failed: {:?}",
|
||||||
e,
|
e,
|
||||||
);
|
)
|
||||||
})
|
})
|
||||||
.ok()?;
|
.ok()?;
|
||||||
|
|
||||||
@@ -261,7 +329,8 @@ where
|
|||||||
|
|
||||||
let mut block_import_params = BlockImportParams::new(BlockOrigin::Own, header);
|
let mut block_import_params = BlockImportParams::new(BlockOrigin::Own, header);
|
||||||
block_import_params.body = Some(b.extrinsics().to_vec());
|
block_import_params.body = Some(b.extrinsics().to_vec());
|
||||||
block_import_params.fork_choice = Some(ForkChoiceStrategy::LongestChain);
|
// Best block is determined by the relay chain.
|
||||||
|
block_import_params.fork_choice = Some(ForkChoiceStrategy::Custom(false));
|
||||||
block_import_params.storage_changes = Some(storage_changes);
|
block_import_params.storage_changes = Some(storage_changes);
|
||||||
|
|
||||||
if let Err(err) = block_import
|
if let Err(err) = block_import
|
||||||
@@ -298,10 +367,11 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Implements `BuildParachainContext` to build a collator instance.
|
/// Implements `BuildParachainContext` to build a collator instance.
|
||||||
pub struct CollatorBuilder<Block: BlockT, PF, BI, Backend, Client> {
|
pub struct CollatorBuilder<Block: BlockT, PF, BI, Backend, Client, BS> {
|
||||||
proposer_factory: PF,
|
proposer_factory: PF,
|
||||||
inherent_data_providers: InherentDataProviders,
|
inherent_data_providers: InherentDataProviders,
|
||||||
block_import: BI,
|
block_import: BI,
|
||||||
|
block_status: Arc<BS>,
|
||||||
para_id: ParaId,
|
para_id: ParaId,
|
||||||
client: Arc<Client>,
|
client: Arc<Client>,
|
||||||
announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>,
|
announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>,
|
||||||
@@ -309,12 +379,15 @@ pub struct CollatorBuilder<Block: BlockT, PF, BI, Backend, Client> {
|
|||||||
_marker: PhantomData<(Block, Backend)>,
|
_marker: PhantomData<(Block, Backend)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Block: BlockT, PF, BI, Backend, Client> CollatorBuilder<Block, PF, BI, Backend, Client> {
|
impl<Block: BlockT, PF, BI, Backend, Client, BS>
|
||||||
|
CollatorBuilder<Block, PF, BI, Backend, Client, BS>
|
||||||
|
{
|
||||||
/// Create a new instance of self.
|
/// Create a new instance of self.
|
||||||
pub fn new(
|
pub fn new(
|
||||||
proposer_factory: PF,
|
proposer_factory: PF,
|
||||||
inherent_data_providers: InherentDataProviders,
|
inherent_data_providers: InherentDataProviders,
|
||||||
block_import: BI,
|
block_import: BI,
|
||||||
|
block_status: Arc<BS>,
|
||||||
para_id: ParaId,
|
para_id: ParaId,
|
||||||
client: Arc<Client>,
|
client: Arc<Client>,
|
||||||
announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>,
|
announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>,
|
||||||
@@ -324,6 +397,7 @@ impl<Block: BlockT, PF, BI, Backend, Client> CollatorBuilder<Block, PF, BI, Back
|
|||||||
proposer_factory,
|
proposer_factory,
|
||||||
inherent_data_providers,
|
inherent_data_providers,
|
||||||
block_import,
|
block_import,
|
||||||
|
block_status,
|
||||||
para_id,
|
para_id,
|
||||||
client,
|
client,
|
||||||
announce_block,
|
announce_block,
|
||||||
@@ -336,8 +410,8 @@ impl<Block: BlockT, PF, BI, Backend, Client> CollatorBuilder<Block, PF, BI, Back
|
|||||||
type TransactionFor<E, Block> =
|
type TransactionFor<E, Block> =
|
||||||
<<E as Environment<Block>>::Proposer as Proposer<Block>>::Transaction;
|
<<E as Environment<Block>>::Proposer as Proposer<Block>>::Transaction;
|
||||||
|
|
||||||
impl<Block: BlockT, PF, BI, Backend, Client> BuildParachainContext
|
impl<Block: BlockT, PF, BI, Backend, Client, BS> BuildParachainContext
|
||||||
for CollatorBuilder<Block, PF, BI, Backend, Client>
|
for CollatorBuilder<Block, PF, BI, Backend, Client, BS>
|
||||||
where
|
where
|
||||||
PF: Environment<Block> + Send + 'static,
|
PF: Environment<Block> + Send + 'static,
|
||||||
BI: BlockImport<Block, Error = sp_consensus::Error, Transaction = TransactionFor<PF, Block>>
|
BI: BlockImport<Block, Error = sp_consensus::Error, Transaction = TransactionFor<PF, Block>>
|
||||||
@@ -350,9 +424,12 @@ where
|
|||||||
+ HeaderBackend<Block>
|
+ HeaderBackend<Block>
|
||||||
+ Send
|
+ Send
|
||||||
+ Sync
|
+ Sync
|
||||||
|
+ BlockBackend<Block>
|
||||||
+ 'static,
|
+ 'static,
|
||||||
|
for<'a> &'a Client: BlockImport<Block>,
|
||||||
|
BS: BlockBackend<Block>,
|
||||||
{
|
{
|
||||||
type ParachainContext = Collator<Block, PF, BI>;
|
type ParachainContext = Collator<Block, PF, BI, BS>;
|
||||||
|
|
||||||
fn build<PClient, Spawner, Extrinsic>(
|
fn build<PClient, Spawner, Extrinsic>(
|
||||||
self,
|
self,
|
||||||
@@ -395,6 +472,7 @@ where
|
|||||||
self.inherent_data_providers,
|
self.inherent_data_providers,
|
||||||
polkadot_network,
|
polkadot_network,
|
||||||
self.block_import,
|
self.block_import,
|
||||||
|
self.block_status,
|
||||||
Arc::new(spawner),
|
Arc::new(spawner),
|
||||||
self.announce_block,
|
self.announce_block,
|
||||||
))
|
))
|
||||||
@@ -417,15 +495,12 @@ mod tests {
|
|||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use polkadot_collator::{collate, SignedStatement};
|
use polkadot_collator::{collate, SignedStatement};
|
||||||
use polkadot_primitives::parachain::{HeadData, Id as ParaId};
|
use polkadot_primitives::parachain::Id as ParaId;
|
||||||
|
|
||||||
use sp_blockchain::Result as ClientResult;
|
use sp_blockchain::Result as ClientResult;
|
||||||
use sp_inherents::InherentData;
|
use sp_inherents::InherentData;
|
||||||
use sp_keyring::Sr25519Keyring;
|
use sp_keyring::Sr25519Keyring;
|
||||||
use sp_runtime::{
|
use sp_runtime::traits::{DigestFor, Header as HeaderT};
|
||||||
generic::BlockId,
|
|
||||||
traits::{DigestFor, Header as HeaderT},
|
|
||||||
};
|
|
||||||
use sp_state_machine::StorageProof;
|
use sp_state_machine::StorageProof;
|
||||||
use substrate_test_client::{NativeExecutor, WasmExecutionMethod::Interpreted};
|
use substrate_test_client::{NativeExecutor, WasmExecutionMethod::Interpreted};
|
||||||
|
|
||||||
@@ -504,9 +579,13 @@ mod tests {
|
|||||||
|
|
||||||
impl cumulus_consensus::PolkadotClient for DummyPolkadotClient {
|
impl cumulus_consensus::PolkadotClient for DummyPolkadotClient {
|
||||||
type Error = Error;
|
type Error = Error;
|
||||||
type Finalized = Box<dyn futures::Stream<Item = Vec<u8>> + Send + Unpin>;
|
type HeadStream = Box<dyn futures::Stream<Item = Vec<u8>> + Send + Unpin>;
|
||||||
|
|
||||||
fn finalized_heads(&self, _: ParaId) -> ClientResult<Self::Finalized> {
|
fn new_best_heads(&self, _: ParaId) -> ClientResult<Self::HeadStream> {
|
||||||
|
unimplemented!("Not required in tests")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn finalized_heads(&self, _: ParaId) -> ClientResult<Self::HeadStream> {
|
||||||
unimplemented!("Not required in tests")
|
unimplemented!("Not required in tests")
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -526,13 +605,15 @@ mod tests {
|
|||||||
let spawner = futures::executor::ThreadPool::new().unwrap();
|
let spawner = futures::executor::ThreadPool::new().unwrap();
|
||||||
let announce_block = |_, _| ();
|
let announce_block = |_, _| ();
|
||||||
let block_announce_validator = DelayedBlockAnnounceValidator::new();
|
let block_announce_validator = DelayedBlockAnnounceValidator::new();
|
||||||
|
let client = Arc::new(TestClientBuilder::new().build());
|
||||||
|
|
||||||
let builder = CollatorBuilder::new(
|
let builder = CollatorBuilder::new(
|
||||||
DummyFactory,
|
DummyFactory,
|
||||||
InherentDataProviders::default(),
|
InherentDataProviders::default(),
|
||||||
TestClientBuilder::new().build(),
|
client.clone(),
|
||||||
|
client.clone(),
|
||||||
id,
|
id,
|
||||||
Arc::new(TestClientBuilder::new().build()),
|
client.clone(),
|
||||||
Arc::new(announce_block),
|
Arc::new(announce_block),
|
||||||
block_announce_validator,
|
block_announce_validator,
|
||||||
);
|
);
|
||||||
@@ -554,13 +635,7 @@ mod tests {
|
|||||||
)
|
)
|
||||||
.expect("Creates parachain context");
|
.expect("Creates parachain context");
|
||||||
|
|
||||||
let header = Header::new(
|
let header = client.header(&BlockId::Number(0)).unwrap().unwrap();
|
||||||
0,
|
|
||||||
Default::default(),
|
|
||||||
Default::default(),
|
|
||||||
Default::default(),
|
|
||||||
Default::default(),
|
|
||||||
);
|
|
||||||
|
|
||||||
let collation = collate(
|
let collation = collate(
|
||||||
Default::default(),
|
Default::default(),
|
||||||
@@ -571,7 +646,7 @@ mod tests {
|
|||||||
max_head_data_size: 0,
|
max_head_data_size: 0,
|
||||||
},
|
},
|
||||||
LocalValidationData {
|
LocalValidationData {
|
||||||
parent_head: HeadData(header.encode()),
|
parent_head: parachain::HeadData(HeadData::<Block> { header }.encode()),
|
||||||
balance: 10,
|
balance: 10,
|
||||||
code_upgrade_allowed: None,
|
code_upgrade_allowed: None,
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -89,7 +89,12 @@ where
|
|||||||
let mut block_import_params = BlockImportParams::new(origin, header);
|
let mut block_import_params = BlockImportParams::new(origin, header);
|
||||||
block_import_params.body = body;
|
block_import_params.body = body;
|
||||||
block_import_params.justification = justification;
|
block_import_params.justification = justification;
|
||||||
block_import_params.fork_choice = Some(ForkChoiceStrategy::LongestChain);
|
|
||||||
|
// Best block is determined by the relay chain, or if we are doing the intial sync
|
||||||
|
// we import all blocks as new best.
|
||||||
|
block_import_params.fork_choice = Some(ForkChoiceStrategy::Custom(
|
||||||
|
origin == BlockOrigin::NetworkInitialSync,
|
||||||
|
));
|
||||||
block_import_params.post_hash = post_hash;
|
block_import_params.post_hash = post_hash;
|
||||||
|
|
||||||
Ok((block_import_params, None))
|
Ok((block_import_params, None))
|
||||||
|
|||||||
+123
-16
@@ -14,10 +14,13 @@
|
|||||||
// You should have received a copy of the GNU General Public License
|
// You should have received a copy of the GNU General Public License
|
||||||
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.
|
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
use sc_client_api::{Backend, Finalizer, UsageProvider};
|
use sc_client_api::{Backend, BlockBackend, Finalizer, UsageProvider};
|
||||||
use sp_api::ProvideRuntimeApi;
|
use sp_api::ProvideRuntimeApi;
|
||||||
use sp_blockchain::{Error as ClientError, Result as ClientResult};
|
use sp_blockchain::{Error as ClientError, Result as ClientResult};
|
||||||
use sp_consensus::{Error as ConsensusError, SelectChain as SelectChainT};
|
use sp_consensus::{
|
||||||
|
BlockImport, BlockImportParams, BlockOrigin, BlockStatus, Error as ConsensusError,
|
||||||
|
ForkChoiceStrategy, SelectChain as SelectChainT,
|
||||||
|
};
|
||||||
use sp_runtime::{
|
use sp_runtime::{
|
||||||
generic::BlockId,
|
generic::BlockId,
|
||||||
traits::{Block as BlockT, Header as HeaderT},
|
traits::{Block as BlockT, Header as HeaderT},
|
||||||
@@ -30,7 +33,7 @@ use polkadot_primitives::{
|
|||||||
|
|
||||||
use codec::Decode;
|
use codec::Decode;
|
||||||
use futures::{future, Future, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
|
use futures::{future, Future, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
|
||||||
use log::warn;
|
use log::{error, trace, warn};
|
||||||
|
|
||||||
use std::{marker::PhantomData, sync::Arc};
|
use std::{marker::PhantomData, sync::Arc};
|
||||||
|
|
||||||
@@ -59,11 +62,14 @@ pub trait PolkadotClient: Clone + 'static {
|
|||||||
/// The error type for interacting with the Polkadot client.
|
/// The error type for interacting with the Polkadot client.
|
||||||
type Error: std::fmt::Debug + Send;
|
type Error: std::fmt::Debug + Send;
|
||||||
|
|
||||||
/// A stream that yields finalized head-data for a certain parachain.
|
/// A stream that yields head-data for a parachain.
|
||||||
type Finalized: Stream<Item = Vec<u8>> + Send + Unpin;
|
type HeadStream: Stream<Item = Vec<u8>> + Send + Unpin;
|
||||||
|
|
||||||
/// Get a stream of finalized heads.
|
/// Get a stream of new best heads for the given parachain.
|
||||||
fn finalized_heads(&self, para_id: ParaId) -> ClientResult<Self::Finalized>;
|
fn new_best_heads(&self, para_id: ParaId) -> ClientResult<Self::HeadStream>;
|
||||||
|
|
||||||
|
/// Get a stream of finalized heads for the given parachain.
|
||||||
|
fn finalized_heads(&self, para_id: ParaId) -> ClientResult<Self::HeadStream>;
|
||||||
|
|
||||||
/// Returns the parachain head for the given `para_id` at the given block id.
|
/// Returns the parachain head for the given `para_id` at the given block id.
|
||||||
fn parachain_head_at(
|
fn parachain_head_at(
|
||||||
@@ -102,16 +108,16 @@ pub fn follow_polkadot<L, P, Block, B>(
|
|||||||
) -> ClientResult<impl Future<Output = ()> + Send + Unpin>
|
) -> ClientResult<impl Future<Output = ()> + Send + Unpin>
|
||||||
where
|
where
|
||||||
Block: BlockT,
|
Block: BlockT,
|
||||||
L: Finalizer<Block, B> + UsageProvider<Block> + Send + Sync,
|
L: Finalizer<Block, B> + UsageProvider<Block> + Send + Sync + BlockBackend<Block>,
|
||||||
|
for<'a> &'a L: BlockImport<Block>,
|
||||||
P: PolkadotClient,
|
P: PolkadotClient,
|
||||||
B: Backend<Block>,
|
B: Backend<Block>,
|
||||||
{
|
{
|
||||||
let finalized_heads = polkadot.finalized_heads(para_id)?;
|
|
||||||
|
|
||||||
let follow_finalized = {
|
let follow_finalized = {
|
||||||
let local = local.clone();
|
let local = local.clone();
|
||||||
|
|
||||||
finalized_heads
|
polkadot
|
||||||
|
.finalized_heads(para_id)?
|
||||||
.map(|head_data| {
|
.map(|head_data| {
|
||||||
<<Block as BlockT>::Header>::decode(&mut &head_data[..])
|
<<Block as BlockT>::Header>::decode(&mut &head_data[..])
|
||||||
.map_err(|_| Error::InvalidHeadData)
|
.map_err(|_| Error::InvalidHeadData)
|
||||||
@@ -123,11 +129,95 @@ where
|
|||||||
.map(|_| ()),
|
.map(|_| ()),
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
|
.map_err(|e| {
|
||||||
|
warn!(
|
||||||
|
target: "cumulus-consensus",
|
||||||
|
"Failed to finalize block: {:?}", e)
|
||||||
|
})
|
||||||
|
.map(|_| ())
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(follow_finalized
|
Ok(future::select(follow_finalized, follow_new_best(para_id, local, polkadot)?).map(|_| ()))
|
||||||
.map_err(|e| warn!("Could not follow relay-chain: {:?}", e))
|
}
|
||||||
.map(|_| ()))
|
|
||||||
|
/// Follow the relay chain new best head, to update the Parachain new best head.
|
||||||
|
fn follow_new_best<L, P, Block, B>(
|
||||||
|
para_id: ParaId,
|
||||||
|
local: Arc<L>,
|
||||||
|
polkadot: P,
|
||||||
|
) -> ClientResult<impl Future<Output = ()> + Send + Unpin>
|
||||||
|
where
|
||||||
|
Block: BlockT,
|
||||||
|
L: Finalizer<Block, B> + UsageProvider<Block> + Send + Sync + BlockBackend<Block>,
|
||||||
|
for<'a> &'a L: BlockImport<Block>,
|
||||||
|
P: PolkadotClient,
|
||||||
|
B: Backend<Block>,
|
||||||
|
{
|
||||||
|
Ok(polkadot
|
||||||
|
.new_best_heads(para_id)?
|
||||||
|
.filter_map(|head_data| {
|
||||||
|
let res = match <<Block as BlockT>::Header>::decode(&mut &head_data[..]) {
|
||||||
|
Ok(header) => Some(header),
|
||||||
|
Err(err) => {
|
||||||
|
warn!(
|
||||||
|
target: "cumulus-consensus",
|
||||||
|
"Could not decode Parachain header: {:?}", err);
|
||||||
|
None
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
future::ready(res)
|
||||||
|
})
|
||||||
|
.for_each(move |h| {
|
||||||
|
let hash = h.hash();
|
||||||
|
|
||||||
|
if local.usage_info().chain.best_hash == hash {
|
||||||
|
trace!(
|
||||||
|
target: "cumulus-consensus",
|
||||||
|
"Skipping set new best block, because block `{}` is already the best.",
|
||||||
|
hash,
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
// Make sure the block is already known or otherwise we skip setting new best.
|
||||||
|
match local.block_status(&BlockId::Hash(hash)) {
|
||||||
|
Ok(BlockStatus::InChainWithState) => {
|
||||||
|
// Make it the new best block
|
||||||
|
let mut block_import_params =
|
||||||
|
BlockImportParams::new(BlockOrigin::ConsensusBroadcast, h);
|
||||||
|
block_import_params.fork_choice = Some(ForkChoiceStrategy::Custom(true));
|
||||||
|
block_import_params.import_existing = true;
|
||||||
|
|
||||||
|
if let Err(err) =
|
||||||
|
(&*local).import_block(block_import_params, Default::default())
|
||||||
|
{
|
||||||
|
warn!(
|
||||||
|
target: "cumulus-consensus",
|
||||||
|
"Failed to set new best block `{}` with error: {:?}",
|
||||||
|
hash, err
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(BlockStatus::InChainPruned) => {
|
||||||
|
error!(
|
||||||
|
target: "cumulus-collator",
|
||||||
|
"Trying to set pruned block `{:?}` as new best!",
|
||||||
|
hash,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!(
|
||||||
|
target: "cumulus-collator",
|
||||||
|
"Failed to get block status of block `{:?}`: {:?}",
|
||||||
|
hash,
|
||||||
|
e,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
future::ready(())
|
||||||
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> PolkadotClient for Arc<T>
|
impl<T> PolkadotClient for Arc<T>
|
||||||
@@ -137,9 +227,26 @@ where
|
|||||||
{
|
{
|
||||||
type Error = ClientError;
|
type Error = ClientError;
|
||||||
|
|
||||||
type Finalized = Box<dyn Stream<Item = Vec<u8>> + Send + Unpin>;
|
type HeadStream = Box<dyn Stream<Item = Vec<u8>> + Send + Unpin>;
|
||||||
|
|
||||||
fn finalized_heads(&self, para_id: ParaId) -> ClientResult<Self::Finalized> {
|
fn new_best_heads(&self, para_id: ParaId) -> ClientResult<Self::HeadStream> {
|
||||||
|
let polkadot = self.clone();
|
||||||
|
|
||||||
|
let s = self.import_notification_stream().filter_map(move |n| {
|
||||||
|
future::ready(if n.is_new_best {
|
||||||
|
polkadot
|
||||||
|
.parachain_head_at(&BlockId::hash(n.hash), para_id)
|
||||||
|
.ok()
|
||||||
|
.and_then(|h| h)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(Box::new(s))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn finalized_heads(&self, para_id: ParaId) -> ClientResult<Self::HeadStream> {
|
||||||
let polkadot = self.clone();
|
let polkadot = self.clone();
|
||||||
|
|
||||||
let s = self.finality_notification_stream().filter_map(move |n| {
|
let s = self.finality_notification_stream().filter_map(move |n| {
|
||||||
|
|||||||
+1
-1
@@ -202,7 +202,7 @@ where
|
|||||||
)) as Box<_>);
|
)) as Box<_>);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(Validation::Success { is_new_best: false })
|
Ok(Validation::Success { is_new_best: true })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
File diff suppressed because one or more lines are too long
@@ -123,6 +123,7 @@ pub fn run_collator(
|
|||||||
proposer_factory,
|
proposer_factory,
|
||||||
inherent_data_providers,
|
inherent_data_providers,
|
||||||
block_import,
|
block_import,
|
||||||
|
client.clone(),
|
||||||
id,
|
id,
|
||||||
client,
|
client,
|
||||||
announce_block,
|
announce_block,
|
||||||
|
|||||||
@@ -292,7 +292,6 @@ async fn integration_test() {
|
|||||||
.unwrap_or(2) as u64;
|
.unwrap_or(2) as u64;
|
||||||
let tip = 0;
|
let tip = 0;
|
||||||
let extra: SignedExtra = (
|
let extra: SignedExtra = (
|
||||||
TransactionCallFilter::<IsCallable, polkadot_runtime::Call>::new(),
|
|
||||||
frame_system::CheckSpecVersion::<Runtime>::new(),
|
frame_system::CheckSpecVersion::<Runtime>::new(),
|
||||||
frame_system::CheckTxVersion::<Runtime>::new(),
|
frame_system::CheckTxVersion::<Runtime>::new(),
|
||||||
frame_system::CheckGenesis::<Runtime>::new(),
|
frame_system::CheckGenesis::<Runtime>::new(),
|
||||||
|
|||||||
Reference in New Issue
Block a user