bitfield-signing: remove util::jobs usage (#5523)

This commit is contained in:
Chris Sosnin
2022-05-19 14:01:22 +03:00
committed by GitHub
parent 32770a4383
commit f1006f5a43
3 changed files with 160 additions and 125 deletions
+157 -118
View File
@@ -24,22 +24,21 @@ use futures::{
channel::{mpsc, oneshot},
future,
lock::Mutex,
prelude::*,
Future,
FutureExt,
};
use polkadot_node_subsystem::{
errors::RuntimeApiError,
jaeger,
messages::{
AvailabilityStoreMessage, BitfieldDistributionMessage, BitfieldSigningMessage,
RuntimeApiMessage, RuntimeApiRequest,
AvailabilityStoreMessage, BitfieldDistributionMessage, RuntimeApiMessage, RuntimeApiRequest,
},
overseer, ActivatedLeaf, LeafStatus, PerLeafSpan, SubsystemSender,
overseer, ActivatedLeaf, FromOverseer, LeafStatus, OverseerSignal, PerLeafSpan,
SpawnedSubsystem, SubsystemError, SubsystemResult, SubsystemSender,
};
use polkadot_node_subsystem_util::{self as util, JobSender, JobSubsystem, JobTrait, Validator};
use polkadot_node_subsystem_util::{self as util, Validator};
use polkadot_primitives::v2::{AvailabilityBitfield, CoreState, Hash, ValidatorIndex};
use sp_keystore::{Error as KeystoreError, SyncCryptoStorePtr};
use std::{iter::FromIterator, pin::Pin, time::Duration};
use std::{collections::HashMap, iter::FromIterator, time::Duration};
use wasm_timer::{Delay, Instant};
mod metrics;
@@ -49,12 +48,10 @@ use self::metrics::Metrics;
mod tests;
/// Delay between starting a bitfield signing job and its attempting to create a bitfield.
const JOB_DELAY: Duration = Duration::from_millis(1500);
const SPAWNED_TASK_DELAY: Duration = Duration::from_millis(1500);
const LOG_TARGET: &str = "parachain::bitfield-signing";
/// Each `BitfieldSigningJob` prepares a signed bitfield for a single relay parent.
pub struct BitfieldSigningJob<Sender>(std::marker::PhantomData<Sender>);
// TODO: use `fatality` (https://github.com/paritytech/polkadot/issues/5540).
/// Errors we may encounter in the course of executing the `BitfieldSigningSubsystem`.
#[derive(Debug, thiserror::Error)]
#[allow(missing_docs)]
@@ -182,114 +179,156 @@ async fn construct_availability_bitfield(
Ok(AvailabilityBitfield(core_bits))
}
impl<Sender> JobTrait for BitfieldSigningJob<Sender>
where
Sender: overseer::BitfieldSigningSenderTrait + Unpin,
{
type ToJob = BitfieldSigningMessage;
type OutgoingMessages = overseer::BitfieldSigningOutgoingMessages;
type Sender = Sender;
type Error = Error;
type RunArgs = SyncCryptoStorePtr;
type Metrics = Metrics;
/// The bitfield signing subsystem.
pub struct BitfieldSigningSubsystem {
keystore: SyncCryptoStorePtr,
metrics: Metrics,
}
const NAME: &'static str = "bitfield-signing-job";
/// Run a job for the parent block indicated
fn run(
leaf: ActivatedLeaf,
keystore: Self::RunArgs,
metrics: Self::Metrics,
_receiver: mpsc::Receiver<BitfieldSigningMessage>,
mut sender: JobSender<Sender>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
let metrics = metrics.clone();
async move {
if let LeafStatus::Stale = leaf.status {
gum::debug!(
target: LOG_TARGET,
hash = ?leaf.hash,
block_number = ?leaf.number,
"Stale leaf - don't sign bitfields."
);
return Ok(())
}
let span = PerLeafSpan::new(leaf.span, "bitfield-signing");
let _span = span.child("delay");
let wait_until = Instant::now() + JOB_DELAY;
// now do all the work we can before we need to wait for the availability store
// if we're not a validator, we can just succeed effortlessly
let validator = match Validator::new(leaf.hash, keystore.clone(), &mut sender).await {
Ok(validator) => validator,
Err(util::Error::NotAValidator) => return Ok(()),
Err(err) => return Err(Error::Util(err)),
};
// wait a bit before doing anything else
Delay::new_at(wait_until).await?;
// this timer does not appear at the head of the function because we don't want to include
// JOB_DELAY each time.
let _timer = metrics.time_run();
drop(_span);
let span_availability = span.child("availability");
let bitfield = match construct_availability_bitfield(
leaf.hash,
&span_availability,
validator.index(),
sender.subsystem_sender(),
)
.await
{
Err(Error::Runtime(runtime_err)) => {
// Don't take down the node on runtime API errors.
gum::warn!(target: LOG_TARGET, err = ?runtime_err, "Encountered a runtime API error");
return Ok(())
},
Err(err) => return Err(err),
Ok(bitfield) => bitfield,
};
drop(span_availability);
let _span = span.child("signing");
let signed_bitfield = match validator
.sign(keystore.clone(), bitfield)
.await
.map_err(|e| Error::Keystore(e))?
{
Some(b) => b,
None => {
gum::error!(
target: LOG_TARGET,
"Key was found at construction, but while signing it could not be found.",
);
return Ok(())
},
};
metrics.on_bitfield_signed();
drop(_span);
let _span = span.child("gossip");
sender
.send_message(BitfieldDistributionMessage::DistributeBitfield(
leaf.hash,
signed_bitfield,
))
.await;
Ok(())
}
.boxed()
impl BitfieldSigningSubsystem {
/// Create a new instance of the `BitfieldSigningSubsystem`.
pub fn new(keystore: SyncCryptoStorePtr, metrics: Metrics) -> Self {
Self { keystore, metrics }
}
}
/// `BitfieldSigningSubsystem` manages a number of bitfield signing jobs.
pub type BitfieldSigningSubsystem<Spawner, Sender> =
JobSubsystem<BitfieldSigningJob<Sender>, Spawner>;
#[overseer::subsystem(BitfieldSigning, error=SubsystemError, prefix=self::overseer)]
impl<Context> BitfieldSigningSubsystem {
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = async move {
run(ctx, self.keystore, self.metrics)
.await
.map_err(|e| SubsystemError::with_origin("bitfield-signing", e))
}
.boxed();
SpawnedSubsystem { name: "bitfield-signing-subsystem", future }
}
}
#[overseer::contextbounds(BitfieldSigning, prefix = self::overseer)]
async fn run<Context>(
mut ctx: Context,
keystore: SyncCryptoStorePtr,
metrics: Metrics,
) -> SubsystemResult<()> {
// Track spawned jobs per active leaf.
let mut running = HashMap::<Hash, future::AbortHandle>::new();
loop {
match ctx.recv().await? {
FromOverseer::Signal(OverseerSignal::ActiveLeaves(update)) => {
// Abort jobs for deactivated leaves.
for leaf in &update.deactivated {
if let Some(handle) = running.remove(leaf) {
handle.abort();
}
}
for leaf in update.activated {
let sender = ctx.sender().clone();
let leaf_hash = leaf.hash;
let (fut, handle) = future::abortable(handle_active_leaves_update(
sender,
leaf,
keystore.clone(),
metrics.clone(),
));
running.insert(leaf_hash, handle);
ctx.spawn("bitfield-signing-job", fut.map(drop).boxed())?;
}
},
FromOverseer::Signal(OverseerSignal::BlockFinalized(..)) => {},
FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(()),
FromOverseer::Communication { .. } => {},
}
}
}
async fn handle_active_leaves_update<Sender>(
mut sender: Sender,
leaf: ActivatedLeaf,
keystore: SyncCryptoStorePtr,
metrics: Metrics,
) -> Result<(), Error>
where
Sender: overseer::BitfieldSigningSenderTrait,
{
if let LeafStatus::Stale = leaf.status {
gum::debug!(
target: LOG_TARGET,
relay_parent = ?leaf.hash,
block_number = ?leaf.number,
"Skip bitfield signing for stale leaf"
);
return Ok(())
}
let span = PerLeafSpan::new(leaf.span, "bitfield-signing");
let span_delay = span.child("delay");
let wait_until = Instant::now() + SPAWNED_TASK_DELAY;
// now do all the work we can before we need to wait for the availability store
// if we're not a validator, we can just succeed effortlessly
let validator = match Validator::new(leaf.hash, keystore.clone(), &mut sender).await {
Ok(validator) => validator,
Err(util::Error::NotAValidator) => return Ok(()),
Err(err) => return Err(Error::Util(err)),
};
// wait a bit before doing anything else
Delay::new_at(wait_until).await?;
// this timer does not appear at the head of the function because we don't want to include
// SPAWNED_TASK_DELAY each time.
let _timer = metrics.time_run();
drop(span_delay);
let span_availability = span.child("availability");
let bitfield = match construct_availability_bitfield(
leaf.hash,
&span_availability,
validator.index(),
&mut sender,
)
.await
{
Err(Error::Runtime(runtime_err)) => {
// Don't take down the node on runtime API errors.
gum::warn!(target: LOG_TARGET, err = ?runtime_err, "Encountered a runtime API error");
return Ok(())
},
Err(err) => return Err(err),
Ok(bitfield) => bitfield,
};
drop(span_availability);
let span_signing = span.child("signing");
let signed_bitfield =
match validator.sign(keystore, bitfield).await.map_err(|e| Error::Keystore(e))? {
Some(b) => b,
None => {
gum::error!(
target: LOG_TARGET,
"Key was found at construction, but while signing it could not be found.",
);
return Ok(())
},
};
metrics.on_bitfield_signed();
drop(span_signing);
let _span_gossip = span.child("gossip");
sender
.send_message(BitfieldDistributionMessage::DistributeBitfield(leaf.hash, signed_bitfield))
.await;
Ok(())
}
@@ -15,7 +15,7 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use super::*;
use futures::{executor::block_on, pin_mut};
use futures::{executor::block_on, pin_mut, StreamExt};
use polkadot_node_subsystem::messages::AllMessages;
use polkadot_primitives::v2::{CandidateHash, OccupiedCore};
use test_helpers::dummy_candidate_descriptor;
+2 -6
View File
@@ -24,7 +24,7 @@ use polkadot_node_core_chain_selection::Config as ChainSelectionConfig;
use polkadot_node_core_dispute_coordinator::Config as DisputeCoordinatorConfig;
use polkadot_node_core_provisioner::ProvisionerConfig;
use polkadot_node_network_protocol::request_response::{v1 as request_v1, IncomingRequestReceiver};
use polkadot_node_subsystem_types::messages::{BitfieldSigningMessage, ProvisionerMessage};
use polkadot_node_subsystem_types::messages::ProvisionerMessage;
#[cfg(any(feature = "malus", test))]
pub use polkadot_overseer::{
dummy::{dummy_overseer_builder, DummySubsystem},
@@ -154,10 +154,7 @@ pub fn prepared_overseer_builder<'a, Spawner, RuntimeClient>(
StatementDistributionSubsystem<rand::rngs::StdRng>,
AvailabilityDistributionSubsystem,
AvailabilityRecoverySubsystem,
BitfieldSigningSubsystem<
Spawner,
<OverseerSubsystemContext<BitfieldSigningMessage> as SubsystemContext>::Sender,
>,
BitfieldSigningSubsystem,
BitfieldDistributionSubsystem,
ProvisionerSubsystem<
Spawner,
@@ -207,7 +204,6 @@ where
))
.bitfield_distribution(BitfieldDistributionSubsystem::new(Metrics::register(registry)?))
.bitfield_signing(BitfieldSigningSubsystem::new(
spawner.clone(),
keystore.clone(),
Metrics::register(registry)?,
))