mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-30 09:21:04 +00:00
Simplify the bitfield signing job (#1920)
Besides that the pr also adds a simple test.
This commit is contained in:
Generated
+1
-1
@@ -5000,7 +5000,7 @@ dependencies = [
|
|||||||
name = "polkadot-node-core-bitfield-signing"
|
name = "polkadot-node-core-bitfield-signing"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bitvec",
|
"derive_more",
|
||||||
"futures 0.3.5",
|
"futures 0.3.5",
|
||||||
"log 0.4.11",
|
"log 0.4.11",
|
||||||
"polkadot-node-subsystem",
|
"polkadot-node-subsystem",
|
||||||
|
|||||||
@@ -5,7 +5,6 @@ authors = ["Parity Technologies <admin@parity.io>"]
|
|||||||
edition = "2018"
|
edition = "2018"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
bitvec = "0.17.4"
|
|
||||||
futures = "0.3.5"
|
futures = "0.3.5"
|
||||||
log = "0.4.11"
|
log = "0.4.11"
|
||||||
polkadot-primitives = { path = "../../../primitives" }
|
polkadot-primitives = { path = "../../../primitives" }
|
||||||
@@ -14,3 +13,4 @@ polkadot-node-subsystem-util = { path = "../../subsystem-util" }
|
|||||||
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||||
wasm-timer = "0.2.4"
|
wasm-timer = "0.2.4"
|
||||||
thiserror = "1.0.21"
|
thiserror = "1.0.21"
|
||||||
|
derive_more = "0.99.11"
|
||||||
|
|||||||
@@ -16,20 +16,16 @@
|
|||||||
|
|
||||||
//! The bitfield signing subsystem produces `SignedAvailabilityBitfield`s once per block.
|
//! The bitfield signing subsystem produces `SignedAvailabilityBitfield`s once per block.
|
||||||
|
|
||||||
#![deny(unused_crate_dependencies, unused_results)]
|
#![deny(unused_crate_dependencies)]
|
||||||
#![warn(missing_docs)]
|
#![warn(missing_docs)]
|
||||||
|
#![recursion_limit="256"]
|
||||||
|
|
||||||
use bitvec::bitvec;
|
use futures::{channel::{mpsc, oneshot}, lock::Mutex, prelude::*, future, Future};
|
||||||
use futures::{
|
|
||||||
channel::{mpsc, oneshot},
|
|
||||||
prelude::*,
|
|
||||||
stream, Future,
|
|
||||||
};
|
|
||||||
use sp_keystore::{Error as KeystoreError, SyncCryptoStorePtr};
|
use sp_keystore::{Error as KeystoreError, SyncCryptoStorePtr};
|
||||||
use polkadot_node_subsystem::{
|
use polkadot_node_subsystem::{
|
||||||
messages::{
|
messages::{
|
||||||
self, AllMessages, AvailabilityStoreMessage, BitfieldDistributionMessage,
|
AllMessages, AvailabilityStoreMessage, BitfieldDistributionMessage,
|
||||||
BitfieldSigningMessage, CandidateBackingMessage, RuntimeApiMessage,
|
BitfieldSigningMessage, CandidateBackingMessage, RuntimeApiMessage, RuntimeApiRequest,
|
||||||
},
|
},
|
||||||
errors::RuntimeApiError,
|
errors::RuntimeApiError,
|
||||||
};
|
};
|
||||||
@@ -38,7 +34,7 @@ use polkadot_node_subsystem_util::{
|
|||||||
metrics::{self, prometheus},
|
metrics::{self, prometheus},
|
||||||
};
|
};
|
||||||
use polkadot_primitives::v1::{AvailabilityBitfield, CoreState, Hash, ValidatorIndex};
|
use polkadot_primitives::v1::{AvailabilityBitfield, CoreState, Hash, ValidatorIndex};
|
||||||
use std::{convert::TryFrom, pin::Pin, time::Duration};
|
use std::{convert::TryFrom, pin::Pin, time::Duration, iter::FromIterator};
|
||||||
use wasm_timer::{Delay, Instant};
|
use wasm_timer::{Delay, Instant};
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
|
||||||
@@ -85,6 +81,7 @@ impl From<BitfieldSigningMessage> for ToJob {
|
|||||||
|
|
||||||
/// Messages which may be sent from a `BitfieldSigningJob`.
|
/// Messages which may be sent from a `BitfieldSigningJob`.
|
||||||
#[allow(missing_docs)]
|
#[allow(missing_docs)]
|
||||||
|
#[derive(Debug, derive_more::From)]
|
||||||
pub enum FromJob {
|
pub enum FromJob {
|
||||||
AvailabilityStore(AvailabilityStoreMessage),
|
AvailabilityStore(AvailabilityStoreMessage),
|
||||||
BitfieldDistribution(BitfieldDistributionMessage),
|
BitfieldDistribution(BitfieldDistributionMessage),
|
||||||
@@ -132,9 +129,6 @@ pub enum Error {
|
|||||||
/// a mspc channel failed to send
|
/// a mspc channel failed to send
|
||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
MpscSend(#[from] mpsc::SendError),
|
MpscSend(#[from] mpsc::SendError),
|
||||||
/// several errors collected into one
|
|
||||||
#[error("Multiple errours occured: {0:?}")]
|
|
||||||
Multiple(Vec<Error>),
|
|
||||||
/// the runtime API failed to return what we wanted
|
/// the runtime API failed to return what we wanted
|
||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
Runtime(#[from] RuntimeApiError),
|
Runtime(#[from] RuntimeApiError),
|
||||||
@@ -143,31 +137,25 @@ pub enum Error {
|
|||||||
Keystore(KeystoreError),
|
Keystore(KeystoreError),
|
||||||
}
|
}
|
||||||
|
|
||||||
// if there is a candidate pending availability, query the Availability Store
|
/// If there is a candidate pending availability, query the Availability Store
|
||||||
// for whether we have the availability chunk for our validator index.
|
/// for whether we have the availability chunk for our validator index.
|
||||||
async fn get_core_availability(
|
async fn get_core_availability(
|
||||||
relay_parent: Hash,
|
relay_parent: Hash,
|
||||||
core: CoreState,
|
core: CoreState,
|
||||||
validator_idx: ValidatorIndex,
|
validator_idx: ValidatorIndex,
|
||||||
sender: &mpsc::Sender<FromJob>,
|
sender: &Mutex<&mut mpsc::Sender<FromJob>>,
|
||||||
) -> Result<bool, Error> {
|
) -> Result<bool, Error> {
|
||||||
use messages::{
|
|
||||||
AvailabilityStoreMessage::QueryChunkAvailability,
|
|
||||||
RuntimeApiRequest::CandidatePendingAvailability,
|
|
||||||
};
|
|
||||||
use FromJob::{AvailabilityStore, RuntimeApi};
|
|
||||||
use RuntimeApiMessage::Request;
|
|
||||||
|
|
||||||
// we have to (cheaply) clone this sender so we can mutate it to actually send anything
|
|
||||||
let mut sender = sender.clone();
|
|
||||||
|
|
||||||
if let CoreState::Occupied(core) = core {
|
if let CoreState::Occupied(core) = core {
|
||||||
let (tx, rx) = oneshot::channel();
|
let (tx, rx) = oneshot::channel();
|
||||||
sender
|
sender
|
||||||
.send(RuntimeApi(Request(
|
.lock()
|
||||||
relay_parent,
|
.await
|
||||||
CandidatePendingAvailability(core.para_id, tx),
|
.send(
|
||||||
)))
|
RuntimeApiMessage::Request(
|
||||||
|
relay_parent,
|
||||||
|
RuntimeApiRequest::CandidatePendingAvailability(core.para_id, tx),
|
||||||
|
).into(),
|
||||||
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let committed_candidate_receipt = match rx.await? {
|
let committed_candidate_receipt = match rx.await? {
|
||||||
@@ -181,27 +169,26 @@ async fn get_core_availability(
|
|||||||
};
|
};
|
||||||
let (tx, rx) = oneshot::channel();
|
let (tx, rx) = oneshot::channel();
|
||||||
sender
|
sender
|
||||||
.send(AvailabilityStore(QueryChunkAvailability(
|
.lock()
|
||||||
committed_candidate_receipt.descriptor.pov_hash,
|
.await
|
||||||
validator_idx,
|
.send(
|
||||||
tx,
|
AvailabilityStoreMessage::QueryChunkAvailability(
|
||||||
)))
|
committed_candidate_receipt.descriptor.pov_hash,
|
||||||
|
validator_idx,
|
||||||
|
tx,
|
||||||
|
).into(),
|
||||||
|
)
|
||||||
.await?;
|
.await?;
|
||||||
return rx.await.map_err(Into::into);
|
return rx.await.map_err(Into::into);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(false)
|
Ok(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
// delegates to the v1 runtime API
|
/// delegates to the v1 runtime API
|
||||||
async fn get_availability_cores(relay_parent: Hash, sender: &mut mpsc::Sender<FromJob>) -> Result<Vec<CoreState>, Error> {
|
async fn get_availability_cores(relay_parent: Hash, sender: &mut mpsc::Sender<FromJob>) -> Result<Vec<CoreState>, Error> {
|
||||||
use FromJob::RuntimeApi;
|
|
||||||
use messages::{
|
|
||||||
RuntimeApiMessage::Request,
|
|
||||||
RuntimeApiRequest::AvailabilityCores,
|
|
||||||
};
|
|
||||||
|
|
||||||
let (tx, rx) = oneshot::channel();
|
let (tx, rx) = oneshot::channel();
|
||||||
sender.send(RuntimeApi(Request(relay_parent, AvailabilityCores(tx)))).await?;
|
sender.send(RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::AvailabilityCores(tx)).into()).await?;
|
||||||
match rx.await {
|
match rx.await {
|
||||||
Ok(Ok(out)) => Ok(out),
|
Ok(Ok(out)) => Ok(out),
|
||||||
Ok(Err(runtime_err)) => Err(runtime_err.into()),
|
Ok(Err(runtime_err)) => Err(runtime_err.into()),
|
||||||
@@ -209,57 +196,28 @@ async fn get_availability_cores(relay_parent: Hash, sender: &mut mpsc::Sender<Fr
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// - get the list of core states from the runtime
|
/// - get the list of core states from the runtime
|
||||||
// - for each core, concurrently determine chunk availability (see `get_core_availability`)
|
/// - for each core, concurrently determine chunk availability (see `get_core_availability`)
|
||||||
// - return the bitfield if there were no errors at any point in this process
|
/// - return the bitfield if there were no errors at any point in this process
|
||||||
// (otherwise, it's prone to false negatives)
|
/// (otherwise, it's prone to false negatives)
|
||||||
async fn construct_availability_bitfield(
|
async fn construct_availability_bitfield(
|
||||||
relay_parent: Hash,
|
relay_parent: Hash,
|
||||||
validator_idx: ValidatorIndex,
|
validator_idx: ValidatorIndex,
|
||||||
sender: &mut mpsc::Sender<FromJob>,
|
sender: &mut mpsc::Sender<FromJob>,
|
||||||
) -> Result<AvailabilityBitfield, Error> {
|
) -> Result<AvailabilityBitfield, Error> {
|
||||||
use futures::lock::Mutex;
|
|
||||||
|
|
||||||
// get the set of availability cores from the runtime
|
// get the set of availability cores from the runtime
|
||||||
let availability_cores = get_availability_cores(relay_parent, sender).await?;
|
let availability_cores = get_availability_cores(relay_parent, sender).await?;
|
||||||
|
|
||||||
// we now need sender to be immutable so we can copy the reference to multiple concurrent closures
|
// Wrap the sender in a Mutex to share it between the futures.
|
||||||
let sender = &*sender;
|
let sender = Mutex::new(sender);
|
||||||
|
|
||||||
// prepare outputs
|
// Handle all cores concurrently
|
||||||
let out = Mutex::new(bitvec!(bitvec::order::Lsb0, u8; 0; availability_cores.len()));
|
// `try_join_all` returns all results in the same order as the input futures.
|
||||||
// in principle, we know that we never want concurrent access to the _same_ bit within the vec;
|
let results = future::try_join_all(
|
||||||
// we could `let out_ref = out.as_mut_ptr();` here instead, and manually assign bits, avoiding
|
availability_cores.into_iter().map(|core| get_core_availability(relay_parent, core, validator_idx, &sender)),
|
||||||
// any need to ever wait to lock this mutex.
|
).await?;
|
||||||
// in practice, it's safer to just use the mutex, and speed optimizations should wait until
|
|
||||||
// benchmarking proves that they are necessary.
|
|
||||||
let out_ref = &out;
|
|
||||||
let errs = Mutex::new(Vec::new());
|
|
||||||
let errs_ref = &errs;
|
|
||||||
|
|
||||||
// Handle each (idx, core) pair concurrently
|
Ok(AvailabilityBitfield(FromIterator::from_iter(results)))
|
||||||
//
|
|
||||||
// In principle, this work is all concurrent, not parallel. In practice, we can't guarantee it, which is why
|
|
||||||
// we need the mutexes and explicit references above.
|
|
||||||
stream::iter(availability_cores.into_iter().enumerate())
|
|
||||||
.for_each_concurrent(None, |(idx, core)| async move {
|
|
||||||
let availability = match get_core_availability(relay_parent, core, validator_idx, sender).await {
|
|
||||||
Ok(availability) => availability,
|
|
||||||
Err(err) => {
|
|
||||||
errs_ref.lock().await.push(err);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
out_ref.lock().await.set(idx, availability);
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
|
|
||||||
let errs = errs.into_inner();
|
|
||||||
if errs.is_empty() {
|
|
||||||
Ok(out.into_inner().into())
|
|
||||||
} else {
|
|
||||||
Err(Error::Multiple(errs.into()))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
@@ -312,7 +270,6 @@ impl JobTrait for BitfieldSigningJob {
|
|||||||
mut sender: mpsc::Sender<FromJob>,
|
mut sender: mpsc::Sender<FromJob>,
|
||||||
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
|
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
|
||||||
async move {
|
async move {
|
||||||
// figure out when to wait to
|
|
||||||
let wait_until = Instant::now() + JOB_DELAY;
|
let wait_until = Instant::now() + JOB_DELAY;
|
||||||
|
|
||||||
// now do all the work we can before we need to wait for the availability store
|
// now do all the work we can before we need to wait for the availability store
|
||||||
@@ -344,24 +301,83 @@ impl JobTrait for BitfieldSigningJob {
|
|||||||
.map_err(|e| Error::Keystore(e))?;
|
.map_err(|e| Error::Keystore(e))?;
|
||||||
metrics.on_bitfield_signed();
|
metrics.on_bitfield_signed();
|
||||||
|
|
||||||
// make an anonymous scope to contain some use statements to simplify creating the outbound message
|
sender
|
||||||
{
|
.send(BitfieldDistributionMessage::DistributeBitfield(relay_parent, signed_bitfield).into())
|
||||||
use BitfieldDistributionMessage::DistributeBitfield;
|
.await
|
||||||
use FromJob::BitfieldDistribution;
|
.map_err(Into::into)
|
||||||
|
|
||||||
sender
|
|
||||||
.send(BitfieldDistribution(DistributeBitfield(
|
|
||||||
relay_parent,
|
|
||||||
signed_bitfield,
|
|
||||||
)))
|
|
||||||
.await
|
|
||||||
.map_err(Into::into)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
.boxed()
|
.boxed()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// BitfieldSigningSubsystem manages a number of bitfield signing jobs.
|
/// BitfieldSigningSubsystem manages a number of bitfield signing jobs.
|
||||||
pub type BitfieldSigningSubsystem<Spawner, Context> =
|
pub type BitfieldSigningSubsystem<Spawner, Context> = JobManager<Spawner, Context, BitfieldSigningJob>;
|
||||||
JobManager<Spawner, Context, BitfieldSigningJob>;
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use futures::{pin_mut, executor::block_on};
|
||||||
|
use polkadot_primitives::v1::{OccupiedCore};
|
||||||
|
use FromJob::*;
|
||||||
|
|
||||||
|
fn occupied_core(para_id: u32) -> CoreState {
|
||||||
|
CoreState::Occupied(OccupiedCore {
|
||||||
|
para_id: para_id.into(),
|
||||||
|
group_responsible: para_id.into(),
|
||||||
|
next_up_on_available: None,
|
||||||
|
occupied_since: 100_u32,
|
||||||
|
time_out_at: 200_u32,
|
||||||
|
next_up_on_time_out: None,
|
||||||
|
availability: Default::default(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn construct_availability_bitfield_works() {
|
||||||
|
block_on(async move {
|
||||||
|
let (mut sender, mut receiver) = mpsc::channel(10);
|
||||||
|
let relay_parent = Hash::default();
|
||||||
|
let validator_index = 1u32;
|
||||||
|
|
||||||
|
let future = construct_availability_bitfield(relay_parent, validator_index, &mut sender).fuse();
|
||||||
|
pin_mut!(future);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
futures::select! {
|
||||||
|
m = receiver.next() => match m.unwrap() {
|
||||||
|
RuntimeApi(RuntimeApiMessage::Request(rp, RuntimeApiRequest::AvailabilityCores(tx))) => {
|
||||||
|
assert_eq!(relay_parent, rp);
|
||||||
|
tx.send(Ok(vec![CoreState::Free, occupied_core(1), occupied_core(2)])).unwrap();
|
||||||
|
},
|
||||||
|
RuntimeApi(
|
||||||
|
RuntimeApiMessage::Request(rp, RuntimeApiRequest::CandidatePendingAvailability(para_id, tx))
|
||||||
|
) => {
|
||||||
|
assert_eq!(relay_parent, rp);
|
||||||
|
|
||||||
|
if para_id == 1.into() {
|
||||||
|
tx.send(Ok(Some(Default::default()))).unwrap();
|
||||||
|
} else {
|
||||||
|
tx.send(Ok(None)).unwrap();
|
||||||
|
}
|
||||||
|
},
|
||||||
|
AvailabilityStore(AvailabilityStoreMessage::QueryChunkAvailability(_, vidx, tx)) => {
|
||||||
|
assert_eq!(validator_index, vidx);
|
||||||
|
|
||||||
|
tx.send(true).unwrap();
|
||||||
|
},
|
||||||
|
o => panic!("Unknown message: {:?}", o),
|
||||||
|
},
|
||||||
|
r = future => match r {
|
||||||
|
Ok(r) => {
|
||||||
|
assert!(!r.0.get(0).unwrap());
|
||||||
|
assert!(r.0.get(1).unwrap());
|
||||||
|
assert!(!r.0.get(2).unwrap());
|
||||||
|
break
|
||||||
|
},
|
||||||
|
Err(e) => panic!("Failed: {:?}", e),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user