pow: add Version for quick-check of metadata state and refactor lock handling (#9698)

* pow: add Version for quick-check of metadata state and refactor lock handling

* typo: mut self -> self

* Run rustfmt

* typo: grammar
This commit is contained in:
Wei Tang
2021-10-12 05:50:21 +02:00
committed by GitHub
parent acd39cbc6c
commit 582ac8f932
2 changed files with 153 additions and 77 deletions
+6 -12
View File
@@ -41,13 +41,12 @@
mod worker; mod worker;
pub use crate::worker::{MiningBuild, MiningMetadata, MiningWorker}; pub use crate::worker::{MiningBuild, MiningHandle, MiningMetadata};
use crate::worker::UntilImportedOrTimeout; use crate::worker::UntilImportedOrTimeout;
use codec::{Decode, Encode}; use codec::{Decode, Encode};
use futures::{Future, StreamExt}; use futures::{Future, StreamExt};
use log::*; use log::*;
use parking_lot::Mutex;
use prometheus_endpoint::Registry; use prometheus_endpoint::Registry;
use sc_client_api::{self, backend::AuxStore, BlockOf, BlockchainEvents}; use sc_client_api::{self, backend::AuxStore, BlockOf, BlockchainEvents};
use sc_consensus::{ use sc_consensus::{
@@ -525,7 +524,7 @@ pub fn start_mining_worker<Block, C, S, Algorithm, E, SO, L, CIDP, CAW>(
build_time: Duration, build_time: Duration,
can_author_with: CAW, can_author_with: CAW,
) -> ( ) -> (
Arc<Mutex<MiningWorker<Block, Algorithm, C, L, <E::Proposer as Proposer<Block>>::Proof>>>, MiningHandle<Block, Algorithm, C, L, <E::Proposer as Proposer<Block>>::Proof>,
impl Future<Output = ()>, impl Future<Output = ()>,
) )
where where
@@ -543,12 +542,7 @@ where
CAW: CanAuthorWith<Block> + Clone + Send + 'static, CAW: CanAuthorWith<Block> + Clone + Send + 'static,
{ {
let mut timer = UntilImportedOrTimeout::new(client.import_notification_stream(), timeout); let mut timer = UntilImportedOrTimeout::new(client.import_notification_stream(), timeout);
let worker = Arc::new(Mutex::new(MiningWorker { let worker = MiningHandle::new(algorithm.clone(), block_import, justification_sync_link);
build: None,
algorithm: algorithm.clone(),
block_import,
justification_sync_link,
}));
let worker_ret = worker.clone(); let worker_ret = worker.clone();
let task = async move { let task = async move {
@@ -559,7 +553,7 @@ where
if sync_oracle.is_major_syncing() { if sync_oracle.is_major_syncing() {
debug!(target: "pow", "Skipping proposal due to sync."); debug!(target: "pow", "Skipping proposal due to sync.");
worker.lock().on_major_syncing(); worker.on_major_syncing();
continue continue
} }
@@ -587,7 +581,7 @@ where
continue continue
} }
if worker.lock().best_hash() == Some(best_hash) { if worker.best_hash() == Some(best_hash) {
continue continue
} }
@@ -682,7 +676,7 @@ where
proposal, proposal,
}; };
worker.lock().on_build(build); worker.on_build(build);
} }
}; };
+147 -65
View File
@@ -22,6 +22,7 @@ use futures::{
}; };
use futures_timer::Delay; use futures_timer::Delay;
use log::*; use log::*;
use parking_lot::Mutex;
use sc_client_api::ImportNotifications; use sc_client_api::ImportNotifications;
use sc_consensus::{BlockImportParams, BoxBlockImport, StateAction, StorageChanges}; use sc_consensus::{BlockImportParams, BoxBlockImport, StateAction, StorageChanges};
use sp_consensus::{BlockOrigin, Proposal}; use sp_consensus::{BlockOrigin, Proposal};
@@ -30,7 +31,16 @@ use sp_runtime::{
traits::{Block as BlockT, Header as HeaderT}, traits::{Block as BlockT, Header as HeaderT},
DigestItem, DigestItem,
}; };
use std::{borrow::Cow, collections::HashMap, pin::Pin, time::Duration}; use std::{
borrow::Cow,
collections::HashMap,
pin::Pin,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Duration,
};
use crate::{PowAlgorithm, PowIntermediate, Seal, INTERMEDIATE_KEY, POW_ENGINE_ID}; use crate::{PowAlgorithm, PowIntermediate, Seal, INTERMEDIATE_KEY, POW_ENGINE_ID};
@@ -60,21 +70,26 @@ pub struct MiningBuild<
pub proposal: Proposal<Block, sp_api::TransactionFor<C, Block>, Proof>, pub proposal: Proposal<Block, sp_api::TransactionFor<C, Block>, Proof>,
} }
/// Version of the mining worker.
#[derive(Eq, PartialEq, Clone, Copy)]
pub struct Version(usize);
/// Mining worker that exposes structs to query the current mining build and submit mined blocks. /// Mining worker that exposes structs to query the current mining build and submit mined blocks.
pub struct MiningWorker< pub struct MiningHandle<
Block: BlockT, Block: BlockT,
Algorithm: PowAlgorithm<Block>, Algorithm: PowAlgorithm<Block>,
C: sp_api::ProvideRuntimeApi<Block>, C: sp_api::ProvideRuntimeApi<Block>,
L: sc_consensus::JustificationSyncLink<Block>, L: sc_consensus::JustificationSyncLink<Block>,
Proof, Proof,
> { > {
pub(crate) build: Option<MiningBuild<Block, Algorithm, C, Proof>>, version: Arc<AtomicUsize>,
pub(crate) algorithm: Algorithm, algorithm: Arc<Algorithm>,
pub(crate) block_import: BoxBlockImport<Block, sp_api::TransactionFor<C, Block>>, justification_sync_link: Arc<L>,
pub(crate) justification_sync_link: L, build: Arc<Mutex<Option<MiningBuild<Block, Algorithm, C, Proof>>>>,
block_import: Arc<Mutex<BoxBlockImport<Block, sp_api::TransactionFor<C, Block>>>>,
} }
impl<Block, Algorithm, C, L, Proof> MiningWorker<Block, Algorithm, C, L, Proof> impl<Block, Algorithm, C, L, Proof> MiningHandle<Block, Algorithm, C, L, Proof>
where where
Block: BlockT, Block: BlockT,
C: sp_api::ProvideRuntimeApi<Block>, C: sp_api::ProvideRuntimeApi<Block>,
@@ -83,35 +98,65 @@ where
L: sc_consensus::JustificationSyncLink<Block>, L: sc_consensus::JustificationSyncLink<Block>,
sp_api::TransactionFor<C, Block>: Send + 'static, sp_api::TransactionFor<C, Block>: Send + 'static,
{ {
fn increment_version(&self) {
self.version.fetch_add(1, Ordering::SeqCst);
}
pub(crate) fn new(
algorithm: Algorithm,
block_import: BoxBlockImport<Block, sp_api::TransactionFor<C, Block>>,
justification_sync_link: L,
) -> Self {
Self {
version: Arc::new(AtomicUsize::new(0)),
algorithm: Arc::new(algorithm),
justification_sync_link: Arc::new(justification_sync_link),
build: Arc::new(Mutex::new(None)),
block_import: Arc::new(Mutex::new(block_import)),
}
}
pub(crate) fn on_major_syncing(&self) {
let mut build = self.build.lock();
*build = None;
self.increment_version();
}
pub(crate) fn on_build(&self, value: MiningBuild<Block, Algorithm, C, Proof>) {
let mut build = self.build.lock();
*build = Some(value);
self.increment_version();
}
/// Get the version of the mining worker.
///
/// This returns type `Version` which can only compare equality. If `Version` is unchanged, then
/// it can be certain that `best_hash` and `metadata` were not changed.
pub fn version(&self) -> Version {
Version(self.version.load(Ordering::SeqCst))
}
/// Get the current best hash. `None` if the worker has just started or the client is doing /// Get the current best hash. `None` if the worker has just started or the client is doing
/// major syncing. /// major syncing.
pub fn best_hash(&self) -> Option<Block::Hash> { pub fn best_hash(&self) -> Option<Block::Hash> {
self.build.as_ref().map(|b| b.metadata.best_hash) self.build.lock().as_ref().map(|b| b.metadata.best_hash)
}
pub(crate) fn on_major_syncing(&mut self) {
self.build = None;
}
pub(crate) fn on_build(&mut self, build: MiningBuild<Block, Algorithm, C, Proof>) {
self.build = Some(build);
} }
/// Get a copy of the current mining metadata, if available. /// Get a copy of the current mining metadata, if available.
pub fn metadata(&self) -> Option<MiningMetadata<Block::Hash, Algorithm::Difficulty>> { pub fn metadata(&self) -> Option<MiningMetadata<Block::Hash, Algorithm::Difficulty>> {
self.build.as_ref().map(|b| b.metadata.clone()) self.build.lock().as_ref().map(|b| b.metadata.clone())
} }
/// Submit a mined seal. The seal will be validated again. Returns true if the submission is /// Submit a mined seal. The seal will be validated again. Returns true if the submission is
/// successful. /// successful.
pub async fn submit(&mut self, seal: Seal) -> bool { pub async fn submit(&self, seal: Seal) -> bool {
if let Some(build) = self.build.take() { if let Some(metadata) = self.metadata() {
match self.algorithm.verify( match self.algorithm.verify(
&BlockId::Hash(build.metadata.best_hash), &BlockId::Hash(metadata.best_hash),
&build.metadata.pre_hash, &metadata.pre_hash,
build.metadata.pre_runtime.as_ref().map(|v| &v[..]), metadata.pre_runtime.as_ref().map(|v| &v[..]),
&seal, &seal,
build.metadata.difficulty, metadata.difficulty,
) { ) {
Ok(true) => (), Ok(true) => (),
Ok(false) => { Ok(false) => {
@@ -130,55 +175,92 @@ where
return false return false
}, },
} }
} else {
warn!(
target: "pow",
"Unable to import mined block: metadata does not exist",
);
return false
}
let seal = DigestItem::Seal(POW_ENGINE_ID, seal); let build = if let Some(build) = {
let (header, body) = build.proposal.block.deconstruct(); let mut build = self.build.lock();
let value = build.take();
let mut import_block = BlockImportParams::new(BlockOrigin::Own, header); if value.is_some() {
import_block.post_digests.push(seal); self.increment_version();
import_block.body = Some(body);
import_block.state_action =
StateAction::ApplyChanges(StorageChanges::Changes(build.proposal.storage_changes));
let intermediate = PowIntermediate::<Algorithm::Difficulty> {
difficulty: Some(build.metadata.difficulty),
};
import_block
.intermediates
.insert(Cow::from(INTERMEDIATE_KEY), Box::new(intermediate) as Box<_>);
let header = import_block.post_header();
match self.block_import.import_block(import_block, HashMap::default()).await {
Ok(res) => {
res.handle_justification(
&header.hash(),
*header.number(),
&mut self.justification_sync_link,
);
info!(
target: "pow",
"✅ Successfully mined block on top of: {}",
build.metadata.best_hash
);
true
},
Err(err) => {
warn!(
target: "pow",
"Unable to import mined block: {:?}",
err,
);
false
},
} }
value
} {
build
} else { } else {
warn!( warn!(
target: "pow", target: "pow",
"Unable to import mined block: build does not exist", "Unable to import mined block: build does not exist",
); );
false return false
};
let seal = DigestItem::Seal(POW_ENGINE_ID, seal);
let (header, body) = build.proposal.block.deconstruct();
let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
import_block.post_digests.push(seal);
import_block.body = Some(body);
import_block.state_action =
StateAction::ApplyChanges(StorageChanges::Changes(build.proposal.storage_changes));
let intermediate = PowIntermediate::<Algorithm::Difficulty> {
difficulty: Some(build.metadata.difficulty),
};
import_block
.intermediates
.insert(Cow::from(INTERMEDIATE_KEY), Box::new(intermediate) as Box<_>);
let header = import_block.post_header();
let mut block_import = self.block_import.lock();
match block_import.import_block(import_block, HashMap::default()).await {
Ok(res) => {
res.handle_justification(
&header.hash(),
*header.number(),
&self.justification_sync_link,
);
info!(
target: "pow",
"✅ Successfully mined block on top of: {}",
build.metadata.best_hash
);
true
},
Err(err) => {
warn!(
target: "pow",
"Unable to import mined block: {:?}",
err,
);
false
},
}
}
}
impl<Block, Algorithm, C, L, Proof> Clone for MiningHandle<Block, Algorithm, C, L, Proof>
where
Block: BlockT,
Algorithm: PowAlgorithm<Block>,
C: sp_api::ProvideRuntimeApi<Block>,
L: sc_consensus::JustificationSyncLink<Block>,
{
fn clone(&self) -> Self {
Self {
version: self.version.clone(),
algorithm: self.algorithm.clone(),
justification_sync_link: self.justification_sync_link.clone(),
build: self.build.clone(),
block_import: self.block_import.clone(),
} }
} }
} }