Fix multi collator setup (#133)

* Start

* Fix compilation

* Fix chainspec

* Don't set best 2 times for the same block

* Check the status of a block before building on it

* Check that the block exists before setting it as the new best

* Reorder code

* Fork choice depends on sync status

* Switch branch again
This commit is contained in:
Bastian Köcher
2020-07-02 12:50:04 +02:00
committed by GitHub
parent 30ad930159
commit 6ca066c893
8 changed files with 519 additions and 279 deletions
+112 -37
View File
@@ -29,16 +29,19 @@ use cumulus_primitives::{
};
use cumulus_runtime::ParachainBlockData;
use sc_client_api::{BlockchainEvents, Finalizer, StateBackend, UsageProvider};
use sc_client_api::{BlockBackend, BlockchainEvents, Finalizer, StateBackend, UsageProvider};
use sc_service::Configuration;
use sp_api::{ApiExt, ProvideRuntimeApi};
use sp_blockchain::HeaderBackend;
use sp_consensus::{
BlockImport, BlockImportParams, BlockOrigin, Environment, Error as ConsensusError,
BlockImport, BlockImportParams, BlockOrigin, BlockStatus, Environment, Error as ConsensusError,
ForkChoiceStrategy, Proposal, Proposer, RecordProof,
};
use sp_inherents::{InherentData, InherentDataProviders};
use sp_runtime::traits::{Block as BlockT, HashFor, Header as HeaderT};
use sp_runtime::{
generic::BlockId,
traits::{Block as BlockT, HashFor, Header as HeaderT},
};
use polkadot_collator::{
BuildParachainContext, Network as CollatorNetwork, ParachainContext, RuntimeApiCollection,
@@ -50,7 +53,7 @@ use polkadot_primitives::{
use codec::{Decode, Encode};
use log::{error, trace};
use log::{debug, error, trace};
use futures::prelude::*;
use futures::task::Spawn;
@@ -60,22 +63,24 @@ use std::{marker::PhantomData, pin::Pin, sync::Arc, time::Duration};
use parking_lot::Mutex;
/// The implementation of the Cumulus `Collator`.
pub struct Collator<Block: BlockT, PF, BI> {
pub struct Collator<Block: BlockT, PF, BI, BS> {
proposer_factory: Arc<Mutex<PF>>,
_phantom: PhantomData<Block>,
inherent_data_providers: InherentDataProviders,
collator_network: Arc<dyn CollatorNetwork>,
block_import: Arc<Mutex<BI>>,
block_status: Arc<BS>,
wait_to_announce: Arc<Mutex<WaitToAnnounce<Block>>>,
}
impl<Block: BlockT, PF, BI> Collator<Block, PF, BI> {
impl<Block: BlockT, PF, BI, BS> Collator<Block, PF, BI, BS> {
/// Create a new instance.
fn new(
proposer_factory: PF,
inherent_data_providers: InherentDataProviders,
collator_network: impl CollatorNetwork + Clone + 'static,
block_import: BI,
block_status: Arc<BS>,
spawner: Arc<dyn Spawn + Send + Sync>,
announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>,
) -> Self {
@@ -92,6 +97,7 @@ impl<Block: BlockT, PF, BI> Collator<Block, PF, BI> {
_phantom: PhantomData,
collator_network,
block_import: Arc::new(Mutex::new(block_import)),
block_status,
wait_to_announce,
}
}
@@ -110,7 +116,7 @@ impl<Block: BlockT, PF, BI> Collator<Block, PF, BI> {
target: "cumulus-collator",
"Failed to create inherent data: {:?}",
e,
);
)
})
.ok()?;
@@ -124,7 +130,7 @@ impl<Block: BlockT, PF, BI> Collator<Block, PF, BI> {
target: "cumulus-collator",
"Failed to put validation function params into inherent data: {:?}",
e,
);
)
})
.ok()?;
@@ -135,7 +141,7 @@ impl<Block: BlockT, PF, BI> Collator<Block, PF, BI> {
target: "cumulus-collator",
"Failed to put downward messages into inherent data: {:?}",
e,
);
)
})
.ok()?;
@@ -143,7 +149,7 @@ impl<Block: BlockT, PF, BI> Collator<Block, PF, BI> {
}
}
impl<Block: BlockT, PF, BI> Clone for Collator<Block, PF, BI> {
impl<Block: BlockT, PF, BI, BS> Clone for Collator<Block, PF, BI, BS> {
fn clone(&self) -> Self {
Self {
proposer_factory: self.proposer_factory.clone(),
@@ -151,12 +157,13 @@ impl<Block: BlockT, PF, BI> Clone for Collator<Block, PF, BI> {
_phantom: PhantomData,
collator_network: self.collator_network.clone(),
block_import: self.block_import.clone(),
block_status: self.block_status.clone(),
wait_to_announce: self.wait_to_announce.clone(),
}
}
}
impl<Block, PF, BI> ParachainContext for Collator<Block, PF, BI>
impl<Block, PF, BI, BS> Collator<Block, PF, BI, BS>
where
Block: BlockT,
PF: Environment<Block> + 'static + Send,
@@ -168,6 +175,63 @@ where
> + Send
+ Sync
+ 'static,
BS: BlockBackend<Block>,
{
/// Checks the status of the given block hash in the Parachain.
///
/// Returns `true` if the block could be found and is good to be build on.
fn check_block_status(&self, hash: Block::Hash) -> bool {
match self.block_status.block_status(&BlockId::Hash(hash)) {
Ok(BlockStatus::Queued) => {
debug!(
target: "cumulus-collator",
"Skipping candidate production, because block `{:?}` is still queued for import.", hash,
);
false
}
Ok(BlockStatus::InChainWithState) => true,
Ok(BlockStatus::InChainPruned) => {
error!(
target: "cumulus-collator",
"Skipping candidate production, because block `{:?}` is already pruned!", hash,
);
false
}
Ok(BlockStatus::KnownBad) => {
error!(
target: "cumulus-collator",
"Block `{}` is tagged as known bad and is included in the relay chain! Skipping candidate production!", hash,
);
false
}
Ok(BlockStatus::Unknown) => {
debug!(
target: "cumulus-collator",
"Skipping candidate production, because block `{:?}` is unknown.", hash,
);
false
}
Err(e) => {
error!(target: "cumulus-collator", "Failed to get block status of `{:?}`: {:?}", hash, e);
false
}
}
}
}
impl<Block, PF, BI, BS> ParachainContext for Collator<Block, PF, BI, BS>
where
Block: BlockT,
PF: Environment<Block> + 'static + Send,
PF::Proposer: Send,
BI: BlockImport<
Block,
Error = ConsensusError,
Transaction = <PF::Proposer as Proposer<Block>>::Transaction,
> + Send
+ Sync
+ 'static,
BS: BlockBackend<Block>,
{
type ProduceCandidate =
Pin<Box<dyn Future<Output = Option<(BlockData, parachain::HeadData)>> + Send>>;
@@ -193,6 +257,10 @@ where
}
};
if !self.check_block_status(last_head.header.hash()) {
return future::ready(None).boxed();
}
let proposer_future = factory.lock().init(&last_head.header);
let wait_to_announce = self.wait_to_announce.clone();
@@ -205,7 +273,7 @@ where
target: "cumulus-collator",
"Could not create proposer: {:?}",
e,
);
)
})
.ok()?;
@@ -225,7 +293,7 @@ where
inherent_data,
Default::default(),
//TODO: Fix this.
Duration::from_secs(6),
Duration::from_millis(500),
RecordProof::Yes,
)
.await
@@ -234,7 +302,7 @@ where
target: "cumulus-collator",
"Proposing failed: {:?}",
e,
);
)
})
.ok()?;
@@ -261,7 +329,8 @@ where
let mut block_import_params = BlockImportParams::new(BlockOrigin::Own, header);
block_import_params.body = Some(b.extrinsics().to_vec());
block_import_params.fork_choice = Some(ForkChoiceStrategy::LongestChain);
// Best block is determined by the relay chain.
block_import_params.fork_choice = Some(ForkChoiceStrategy::Custom(false));
block_import_params.storage_changes = Some(storage_changes);
if let Err(err) = block_import
@@ -298,10 +367,11 @@ where
}
/// Implements `BuildParachainContext` to build a collator instance.
pub struct CollatorBuilder<Block: BlockT, PF, BI, Backend, Client> {
pub struct CollatorBuilder<Block: BlockT, PF, BI, Backend, Client, BS> {
proposer_factory: PF,
inherent_data_providers: InherentDataProviders,
block_import: BI,
block_status: Arc<BS>,
para_id: ParaId,
client: Arc<Client>,
announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>,
@@ -309,12 +379,15 @@ pub struct CollatorBuilder<Block: BlockT, PF, BI, Backend, Client> {
_marker: PhantomData<(Block, Backend)>,
}
impl<Block: BlockT, PF, BI, Backend, Client> CollatorBuilder<Block, PF, BI, Backend, Client> {
impl<Block: BlockT, PF, BI, Backend, Client, BS>
CollatorBuilder<Block, PF, BI, Backend, Client, BS>
{
/// Create a new instance of self.
pub fn new(
proposer_factory: PF,
inherent_data_providers: InherentDataProviders,
block_import: BI,
block_status: Arc<BS>,
para_id: ParaId,
client: Arc<Client>,
announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>,
@@ -324,6 +397,7 @@ impl<Block: BlockT, PF, BI, Backend, Client> CollatorBuilder<Block, PF, BI, Back
proposer_factory,
inherent_data_providers,
block_import,
block_status,
para_id,
client,
announce_block,
@@ -336,8 +410,8 @@ impl<Block: BlockT, PF, BI, Backend, Client> CollatorBuilder<Block, PF, BI, Back
type TransactionFor<E, Block> =
<<E as Environment<Block>>::Proposer as Proposer<Block>>::Transaction;
impl<Block: BlockT, PF, BI, Backend, Client> BuildParachainContext
for CollatorBuilder<Block, PF, BI, Backend, Client>
impl<Block: BlockT, PF, BI, Backend, Client, BS> BuildParachainContext
for CollatorBuilder<Block, PF, BI, Backend, Client, BS>
where
PF: Environment<Block> + Send + 'static,
BI: BlockImport<Block, Error = sp_consensus::Error, Transaction = TransactionFor<PF, Block>>
@@ -350,9 +424,12 @@ where
+ HeaderBackend<Block>
+ Send
+ Sync
+ BlockBackend<Block>
+ 'static,
for<'a> &'a Client: BlockImport<Block>,
BS: BlockBackend<Block>,
{
type ParachainContext = Collator<Block, PF, BI>;
type ParachainContext = Collator<Block, PF, BI, BS>;
fn build<PClient, Spawner, Extrinsic>(
self,
@@ -395,6 +472,7 @@ where
self.inherent_data_providers,
polkadot_network,
self.block_import,
self.block_status,
Arc::new(spawner),
self.announce_block,
))
@@ -417,15 +495,12 @@ mod tests {
use std::time::Duration;
use polkadot_collator::{collate, SignedStatement};
use polkadot_primitives::parachain::{HeadData, Id as ParaId};
use polkadot_primitives::parachain::Id as ParaId;
use sp_blockchain::Result as ClientResult;
use sp_inherents::InherentData;
use sp_keyring::Sr25519Keyring;
use sp_runtime::{
generic::BlockId,
traits::{DigestFor, Header as HeaderT},
};
use sp_runtime::traits::{DigestFor, Header as HeaderT};
use sp_state_machine::StorageProof;
use substrate_test_client::{NativeExecutor, WasmExecutionMethod::Interpreted};
@@ -504,9 +579,13 @@ mod tests {
impl cumulus_consensus::PolkadotClient for DummyPolkadotClient {
type Error = Error;
type Finalized = Box<dyn futures::Stream<Item = Vec<u8>> + Send + Unpin>;
type HeadStream = Box<dyn futures::Stream<Item = Vec<u8>> + Send + Unpin>;
fn finalized_heads(&self, _: ParaId) -> ClientResult<Self::Finalized> {
fn new_best_heads(&self, _: ParaId) -> ClientResult<Self::HeadStream> {
unimplemented!("Not required in tests")
}
fn finalized_heads(&self, _: ParaId) -> ClientResult<Self::HeadStream> {
unimplemented!("Not required in tests")
}
@@ -526,13 +605,15 @@ mod tests {
let spawner = futures::executor::ThreadPool::new().unwrap();
let announce_block = |_, _| ();
let block_announce_validator = DelayedBlockAnnounceValidator::new();
let client = Arc::new(TestClientBuilder::new().build());
let builder = CollatorBuilder::new(
DummyFactory,
InherentDataProviders::default(),
TestClientBuilder::new().build(),
client.clone(),
client.clone(),
id,
Arc::new(TestClientBuilder::new().build()),
client.clone(),
Arc::new(announce_block),
block_announce_validator,
);
@@ -554,13 +635,7 @@ mod tests {
)
.expect("Creates parachain context");
let header = Header::new(
0,
Default::default(),
Default::default(),
Default::default(),
Default::default(),
);
let header = client.header(&BlockId::Number(0)).unwrap().unwrap();
let collation = collate(
Default::default(),
@@ -571,7 +646,7 @@ mod tests {
max_head_data_size: 0,
},
LocalValidationData {
parent_head: HeadData(header.encode()),
parent_head: parachain::HeadData(HeadData::<Block> { header }.encode()),
balance: 10,
code_upgrade_allowed: None,
},