Plumb polkadot client into the collator struct (#255)

* plumb polkadot_client into Collator

* plumb para_id into Collator

* promote retrieve_dmq_contents to a method

* remove the retrieve_dmq_contents closure
This commit is contained in:
Sergei Shulepov
2020-12-03 23:02:19 +01:00
committed by GitHub
parent 5a97db73fb
commit 233b347a58
+44 -33
View File
@@ -60,20 +60,22 @@ type TransactionFor<E, Block> =
<<E as Environment<Block>>::Proposer as Proposer<Block>>::Transaction; <<E as Environment<Block>>::Proposer as Proposer<Block>>::Transaction;
/// The implementation of the Cumulus `Collator`. /// The implementation of the Cumulus `Collator`.
pub struct Collator<Block: BlockT, PF, BI, BS, Backend> { pub struct Collator<Block: BlockT, PF, BI, BS, Backend, PBackend, PClient> {
para_id: ParaId,
proposer_factory: Arc<Mutex<PF>>, proposer_factory: Arc<Mutex<PF>>,
_phantom: PhantomData<Block>, _phantom: PhantomData<(Block, PBackend)>,
inherent_data_providers: InherentDataProviders, inherent_data_providers: InherentDataProviders,
block_import: Arc<Mutex<BI>>, block_import: Arc<Mutex<BI>>,
block_status: Arc<BS>, block_status: Arc<BS>,
wait_to_announce: Arc<Mutex<WaitToAnnounce<Block>>>, wait_to_announce: Arc<Mutex<WaitToAnnounce<Block>>>,
backend: Arc<Backend>, backend: Arc<Backend>,
retrieve_dmq_contents: Arc<dyn Fn(PHash) -> Option<DownwardMessagesType> + Send + Sync>, polkadot_client: Arc<PClient>,
} }
impl<Block: BlockT, PF, BI, BS, Backend> Clone for Collator<Block, PF, BI, BS, Backend> { impl<Block: BlockT, PF, BI, BS, Backend, PBackend, PClient> Clone for Collator<Block, PF, BI, BS, Backend, PBackend, PClient> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {
para_id: self.para_id.clone(),
proposer_factory: self.proposer_factory.clone(), proposer_factory: self.proposer_factory.clone(),
inherent_data_providers: self.inherent_data_providers.clone(), inherent_data_providers: self.inherent_data_providers.clone(),
_phantom: PhantomData, _phantom: PhantomData,
@@ -81,12 +83,12 @@ impl<Block: BlockT, PF, BI, BS, Backend> Clone for Collator<Block, PF, BI, BS, B
block_status: self.block_status.clone(), block_status: self.block_status.clone(),
wait_to_announce: self.wait_to_announce.clone(), wait_to_announce: self.wait_to_announce.clone(),
backend: self.backend.clone(), backend: self.backend.clone(),
retrieve_dmq_contents: self.retrieve_dmq_contents.clone(), polkadot_client: self.polkadot_client.clone(),
} }
} }
} }
impl<Block, PF, BI, BS, Backend> Collator<Block, PF, BI, BS, Backend> impl<Block, PF, BI, BS, Backend, PBackend, PApi, PClient> Collator<Block, PF, BI, BS, Backend, PBackend, PClient>
where where
Block: BlockT, Block: BlockT,
PF: Environment<Block> + 'static + Send, PF: Environment<Block> + 'static + Send,
@@ -100,9 +102,14 @@ where
+ 'static, + 'static,
BS: BlockBackend<Block>, BS: BlockBackend<Block>,
Backend: sc_client_api::Backend<Block> + 'static, Backend: sc_client_api::Backend<Block> + 'static,
PBackend: sc_client_api::Backend<PBlock> + 'static,
PBackend::State: StateBackend<BlakeTwo256>,
PApi: RuntimeApiCollection<StateBackend = PBackend::State>,
PClient: polkadot_service::AbstractClient<PBlock, PBackend, Api = PApi> + 'static,
{ {
/// Create a new instance. /// Create a new instance.
fn new( fn new(
para_id: ParaId,
proposer_factory: PF, proposer_factory: PF,
inherent_data_providers: InherentDataProviders, inherent_data_providers: InherentDataProviders,
overseer_handler: OverseerHandler, overseer_handler: OverseerHandler,
@@ -111,7 +118,7 @@ where
spawner: Arc<dyn SpawnNamed + Send + Sync>, spawner: Arc<dyn SpawnNamed + Send + Sync>,
announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>, announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>,
backend: Arc<Backend>, backend: Arc<Backend>,
retrieve_dmq_contents: Arc<dyn Fn(PHash) -> Option<DownwardMessagesType> + Send + Sync>, polkadot_client: Arc<PClient>,
) -> Self { ) -> Self {
let wait_to_announce = Arc::new(Mutex::new(WaitToAnnounce::new( let wait_to_announce = Arc::new(Mutex::new(WaitToAnnounce::new(
spawner, spawner,
@@ -120,6 +127,7 @@ where
))); )));
Self { Self {
para_id,
proposer_factory: Arc::new(Mutex::new(proposer_factory)), proposer_factory: Arc::new(Mutex::new(proposer_factory)),
inherent_data_providers, inherent_data_providers,
_phantom: PhantomData, _phantom: PhantomData,
@@ -127,10 +135,33 @@ where
block_status, block_status,
wait_to_announce, wait_to_announce,
backend, backend,
retrieve_dmq_contents, polkadot_client,
} }
} }
/// Returns the whole contents of the downward message queue for the parachain we are collating
/// for.
///
/// Returns `None` in case of an error.
fn retrieve_dmq_contents(&self, relay_parent: PHash) -> Option<DownwardMessagesType> {
self
.polkadot_client
.runtime_api()
.dmq_contents_with_context(
&BlockId::hash(relay_parent),
sp_core::ExecutionContext::Importing,
self.para_id,
)
.map_err(|e| {
error!(
target: "cumulus-collator",
"An error occured during requesting the downward messages for {}: {:?}",
relay_parent, e,
);
})
.ok()
}
/// Get the inherent data with validation function parameters injected /// Get the inherent data with validation function parameters injected
fn inherent_data( fn inherent_data(
&mut self, &mut self,
@@ -160,7 +191,7 @@ where
}) })
.ok()?; .ok()?;
let downward_messages = (self.retrieve_dmq_contents)(relay_parent)?; let downward_messages = self.retrieve_dmq_contents(relay_parent)?;
inherent_data inherent_data
.put_data(DOWNWARD_MESSAGES_IDENTIFIER, &downward_messages) .put_data(DOWNWARD_MESSAGES_IDENTIFIER, &downward_messages)
.map_err(|e| { .map_err(|e| {
@@ -459,36 +490,15 @@ where
for<'a> &'a Client: BlockImport<Block>, for<'a> &'a Client: BlockImport<Block>,
BS: BlockBackend<Block> + Send + Sync + 'static, BS: BlockBackend<Block> + Send + Sync + 'static,
Spawner: SpawnNamed + Clone + Send + Sync + 'static, Spawner: SpawnNamed + Clone + Send + Sync + 'static,
PBackend: sc_client_api::Backend<PBlock>, PBackend: sc_client_api::Backend<PBlock> + 'static,
PBackend::State: StateBackend<BlakeTwo256>, PBackend::State: StateBackend<BlakeTwo256>,
PApi: RuntimeApiCollection<StateBackend = PBackend::State>, PApi: RuntimeApiCollection<StateBackend = PBackend::State>,
PClient: polkadot_service::AbstractClient<PBlock, PBackend, Api = PApi> + 'static, PClient: polkadot_service::AbstractClient<PBlock, PBackend, Api = PApi> + 'static,
{ {
let retrieve_dmq_contents = {
let polkadot_client = polkadot_client.clone();
move |relay_parent: PHash| {
polkadot_client
.runtime_api()
.dmq_contents_with_context(
&BlockId::hash(relay_parent),
sp_core::ExecutionContext::Importing,
para_id,
)
.map_err(|e| {
error!(
target: "cumulus-collator",
"An error occured during requesting the downward messages for {}: {:?}",
relay_parent, e,
);
})
.ok()
}
};
let follow = match cumulus_consensus::follow_polkadot( let follow = match cumulus_consensus::follow_polkadot(
para_id, para_id,
client, client,
polkadot_client, polkadot_client.clone(),
announce_block.clone(), announce_block.clone(),
) { ) {
Ok(follow) => follow, Ok(follow) => follow,
@@ -498,6 +508,7 @@ where
spawner.spawn("cumulus-follow-polkadot", follow.map(|_| ()).boxed()); spawner.spawn("cumulus-follow-polkadot", follow.map(|_| ()).boxed());
let collator = Collator::new( let collator = Collator::new(
para_id,
proposer_factory, proposer_factory,
inherent_data_providers, inherent_data_providers,
overseer_handler.clone(), overseer_handler.clone(),
@@ -506,7 +517,7 @@ where
Arc::new(spawner), Arc::new(spawner),
announce_block, announce_block,
backend, backend,
Arc::new(retrieve_dmq_contents), polkadot_client,
); );
let config = CollationGenerationConfig { let config = CollationGenerationConfig {