mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-15 22:01:04 +00:00
Fix bitfield signing (#1466)
* Apply suggestions from #1364 code review - use CoreState, not CoreOccupied - query for availability chunks, not the whole PoV - create a stub `fn availability_cores` * link to issue documenting unimplemented * implement get_availability_cores by adding a new runtime api request * back out an unrelated change properly part of #1404 * av-store: handle QueryChunkAvailability * simplify QueryDataAvailability * remove extraneous whitespace * compact primitive imports
This commit is contained in:
committed by
GitHub
parent
756f95c407
commit
4b2cb04e56
@@ -149,16 +149,14 @@ fn process_message(db: &Arc<dyn KeyValueDB>, msg: AvailabilityStoreMessage) -> R
|
|||||||
tx.send(available_data(db, &hash).map(|d| d.data)).map_err(|_| oneshot::Canceled)?;
|
tx.send(available_data(db, &hash).map(|d| d.data)).map_err(|_| oneshot::Canceled)?;
|
||||||
}
|
}
|
||||||
QueryDataAvailability(hash, tx) => {
|
QueryDataAvailability(hash, tx) => {
|
||||||
let result = match available_data(db, &hash) {
|
tx.send(available_data(db, &hash).is_some()).map_err(|_| oneshot::Canceled)?;
|
||||||
Some(_) => true,
|
|
||||||
None => false,
|
|
||||||
};
|
|
||||||
|
|
||||||
tx.send(result).map_err(|_| oneshot::Canceled)?;
|
|
||||||
}
|
}
|
||||||
QueryChunk(hash, id, tx) => {
|
QueryChunk(hash, id, tx) => {
|
||||||
tx.send(get_chunk(db, &hash, id)?).map_err(|_| oneshot::Canceled)?;
|
tx.send(get_chunk(db, &hash, id)?).map_err(|_| oneshot::Canceled)?;
|
||||||
}
|
}
|
||||||
|
QueryChunkAvailability(hash, id, tx) => {
|
||||||
|
tx.send(get_chunk(db, &hash, id)?.is_some()).map_err(|_| oneshot::Canceled)?;
|
||||||
|
}
|
||||||
StoreChunk(hash, id, chunk, tx) => {
|
StoreChunk(hash, id, chunk, tx) => {
|
||||||
match store_chunk(db, &hash, id, chunk) {
|
match store_chunk(db, &hash, id, chunk) {
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
|||||||
@@ -26,11 +26,11 @@ use keystore::KeyStorePtr;
|
|||||||
use polkadot_node_subsystem::{
|
use polkadot_node_subsystem::{
|
||||||
messages::{
|
messages::{
|
||||||
self, AllMessages, AvailabilityStoreMessage, BitfieldDistributionMessage,
|
self, AllMessages, AvailabilityStoreMessage, BitfieldDistributionMessage,
|
||||||
BitfieldSigningMessage, CandidateBackingMessage, RuntimeApiMessage,
|
BitfieldSigningMessage, CandidateBackingMessage, RuntimeApiMessage, RuntimeApiError
|
||||||
},
|
},
|
||||||
util::{self, JobManager, JobTrait, ToJobTrait, Validator},
|
util::{self, JobManager, JobTrait, ToJobTrait, Validator},
|
||||||
};
|
};
|
||||||
use polkadot_primitives::v1::{AvailabilityBitfield, CoreState, Hash};
|
use polkadot_primitives::v1::{AvailabilityBitfield, CoreState, Hash, ValidatorIndex};
|
||||||
use std::{convert::TryFrom, pin::Pin, time::Duration};
|
use std::{convert::TryFrom, pin::Pin, time::Duration};
|
||||||
use wasm_timer::{Delay, Instant};
|
use wasm_timer::{Delay, Instant};
|
||||||
|
|
||||||
@@ -125,16 +125,21 @@ pub enum Error {
|
|||||||
/// several errors collected into one
|
/// several errors collected into one
|
||||||
#[from]
|
#[from]
|
||||||
Multiple(Vec<Error>),
|
Multiple(Vec<Error>),
|
||||||
|
/// the runtime API failed to return what we wanted
|
||||||
|
#[from]
|
||||||
|
Runtime(RuntimeApiError),
|
||||||
}
|
}
|
||||||
|
|
||||||
// this function exists mainly to collect a bunch of potential error points into one.
|
// if there is a candidate pending availability, query the Availability Store
|
||||||
|
// for whether we have the availability chunk for our validator index.
|
||||||
async fn get_core_availability(
|
async fn get_core_availability(
|
||||||
relay_parent: Hash,
|
relay_parent: Hash,
|
||||||
core: CoreState,
|
core: CoreState,
|
||||||
|
validator_idx: ValidatorIndex,
|
||||||
sender: &mpsc::Sender<FromJob>,
|
sender: &mpsc::Sender<FromJob>,
|
||||||
) -> Result<bool, Error> {
|
) -> Result<bool, Error> {
|
||||||
use messages::{
|
use messages::{
|
||||||
AvailabilityStoreMessage::QueryDataAvailability,
|
AvailabilityStoreMessage::QueryChunkAvailability,
|
||||||
RuntimeApiRequest::CandidatePendingAvailability,
|
RuntimeApiRequest::CandidatePendingAvailability,
|
||||||
};
|
};
|
||||||
use FromJob::{AvailabilityStore, RuntimeApi};
|
use FromJob::{AvailabilityStore, RuntimeApi};
|
||||||
@@ -163,8 +168,9 @@ async fn get_core_availability(
|
|||||||
};
|
};
|
||||||
let (tx, rx) = oneshot::channel();
|
let (tx, rx) = oneshot::channel();
|
||||||
sender
|
sender
|
||||||
.send(AvailabilityStore(QueryDataAvailability(
|
.send(AvailabilityStore(QueryChunkAvailability(
|
||||||
committed_candidate_receipt.descriptor.pov_hash,
|
committed_candidate_receipt.descriptor.pov_hash,
|
||||||
|
validator_idx,
|
||||||
tx,
|
tx,
|
||||||
)))
|
)))
|
||||||
.await?;
|
.await?;
|
||||||
@@ -173,43 +179,42 @@ async fn get_core_availability(
|
|||||||
Ok(false)
|
Ok(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
// the way this function works is not intuitive:
|
// delegates to the v1 runtime API
|
||||||
//
|
async fn get_availability_cores(relay_parent: Hash, sender: &mut mpsc::Sender<FromJob>) -> Result<Vec<CoreState>, Error> {
|
||||||
// - get the availability cores so we have a list of cores, in order.
|
use FromJob::RuntimeApi;
|
||||||
// - for each occupied core, fetch `candidate_pending_availability` from runtime
|
use messages::{
|
||||||
// - from there, we can get the `CandidateDescriptor`
|
RuntimeApiMessage::Request,
|
||||||
// - from there, we can send a `AvailabilityStore::QueryPoV` and set the indexed bit to 1 if it returns Some(_)
|
RuntimeApiRequest::AvailabilityCores,
|
||||||
|
};
|
||||||
|
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
sender.send(RuntimeApi(Request(relay_parent, AvailabilityCores(tx)))).await?;
|
||||||
|
match rx.await {
|
||||||
|
Ok(Ok(out)) => Ok(out),
|
||||||
|
Ok(Err(runtime_err)) => Err(runtime_err.into()),
|
||||||
|
Err(err) => Err(err.into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// - get the list of core states from the runtime
|
||||||
|
// - for each core, concurrently determine chunk availability (see `get_core_availability`)
|
||||||
|
// - return the bitfield if there were no errors at any point in this process
|
||||||
|
// (otherwise, it's prone to false negatives)
|
||||||
async fn construct_availability_bitfield(
|
async fn construct_availability_bitfield(
|
||||||
relay_parent: Hash,
|
relay_parent: Hash,
|
||||||
|
validator_idx: ValidatorIndex,
|
||||||
sender: &mut mpsc::Sender<FromJob>,
|
sender: &mut mpsc::Sender<FromJob>,
|
||||||
) -> Result<Option<AvailabilityBitfield>, Error> {
|
) -> Result<AvailabilityBitfield, Error> {
|
||||||
use futures::lock::Mutex;
|
use futures::lock::Mutex;
|
||||||
|
|
||||||
use messages::RuntimeApiRequest::AvailabilityCores;
|
// get the set of availability cores from the runtime
|
||||||
use FromJob::RuntimeApi;
|
let availability_cores = get_availability_cores(relay_parent, sender).await?;
|
||||||
use RuntimeApiMessage::Request;
|
|
||||||
|
|
||||||
// request the availability cores metadata from runtime.
|
|
||||||
let (tx, rx) = oneshot::channel();
|
|
||||||
sender
|
|
||||||
.send(RuntimeApi(Request(relay_parent, AvailabilityCores(tx))))
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
// we now need sender to be immutable so we can copy the reference to multiple concurrent closures
|
// we now need sender to be immutable so we can copy the reference to multiple concurrent closures
|
||||||
let sender = &*sender;
|
let sender = &*sender;
|
||||||
|
|
||||||
// wait for the cores
|
|
||||||
let availability_cores = match rx.await? {
|
|
||||||
Ok(a) => a,
|
|
||||||
Err(e) => {
|
|
||||||
log::warn!(target: "bitfield_signing", "Encountered a runtime API error: {:?}", e);
|
|
||||||
return Ok(None);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// prepare outputs
|
// prepare outputs
|
||||||
let out =
|
let out = Mutex::new(bitvec!(bitvec::order::Lsb0, u8; 0; availability_cores.len()));
|
||||||
Mutex::new(bitvec!(bitvec::order::Lsb0, u8; 0; availability_cores.len()));
|
|
||||||
// in principle, we know that we never want concurrent access to the _same_ bit within the vec;
|
// in principle, we know that we never want concurrent access to the _same_ bit within the vec;
|
||||||
// we could `let out_ref = out.as_mut_ptr();` here instead, and manually assign bits, avoiding
|
// we could `let out_ref = out.as_mut_ptr();` here instead, and manually assign bits, avoiding
|
||||||
// any need to ever wait to lock this mutex.
|
// any need to ever wait to lock this mutex.
|
||||||
@@ -225,7 +230,7 @@ async fn construct_availability_bitfield(
|
|||||||
// we need the mutexes and explicit references above.
|
// we need the mutexes and explicit references above.
|
||||||
stream::iter(availability_cores.into_iter().enumerate())
|
stream::iter(availability_cores.into_iter().enumerate())
|
||||||
.for_each_concurrent(None, |(idx, core)| async move {
|
.for_each_concurrent(None, |(idx, core)| async move {
|
||||||
let availability = match get_core_availability(relay_parent, core, sender).await {
|
let availability = match get_core_availability(relay_parent, core, validator_idx, sender).await {
|
||||||
Ok(availability) => availability,
|
Ok(availability) => availability,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
errs_ref.lock().await.push(err);
|
errs_ref.lock().await.push(err);
|
||||||
@@ -238,7 +243,7 @@ async fn construct_availability_bitfield(
|
|||||||
|
|
||||||
let errs = errs.into_inner();
|
let errs = errs.into_inner();
|
||||||
if errs.is_empty() {
|
if errs.is_empty() {
|
||||||
Ok(Some(out.into_inner().into()))
|
Ok(out.into_inner().into())
|
||||||
} else {
|
} else {
|
||||||
Err(errs.into())
|
Err(errs.into())
|
||||||
}
|
}
|
||||||
@@ -275,10 +280,15 @@ impl JobTrait for BitfieldSigningJob {
|
|||||||
Delay::new_at(wait_until).await?;
|
Delay::new_at(wait_until).await?;
|
||||||
|
|
||||||
let bitfield =
|
let bitfield =
|
||||||
match construct_availability_bitfield(relay_parent, &mut sender).await?
|
match construct_availability_bitfield(relay_parent, validator.index(), &mut sender).await
|
||||||
{
|
{
|
||||||
None => return Ok(()),
|
Err(Error::Runtime(runtime_err)) => {
|
||||||
Some(b) => b,
|
// Don't take down the node on runtime API errors.
|
||||||
|
log::warn!(target: "bitfield_signing", "Encountered a runtime API error: {:?}", runtime_err);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
Err(err) => return Err(err),
|
||||||
|
Ok(bitfield) => bitfield,
|
||||||
};
|
};
|
||||||
|
|
||||||
let signed_bitfield = validator.sign(bitfield);
|
let signed_bitfield = validator.sign(bitfield);
|
||||||
|
|||||||
@@ -242,7 +242,7 @@ pub enum AvailabilityStoreMessage {
|
|||||||
|
|
||||||
/// Query whether a `AvailableData` exists within the AV Store.
|
/// Query whether a `AvailableData` exists within the AV Store.
|
||||||
///
|
///
|
||||||
/// This is useful in cases like bitfield signing, when existence
|
/// This is useful in cases when existence
|
||||||
/// matters, but we don't want to necessarily pass around multiple
|
/// matters, but we don't want to necessarily pass around multiple
|
||||||
/// megabytes of data to get a single bit of information.
|
/// megabytes of data to get a single bit of information.
|
||||||
QueryDataAvailability(Hash, oneshot::Sender<bool>),
|
QueryDataAvailability(Hash, oneshot::Sender<bool>),
|
||||||
@@ -250,6 +250,13 @@ pub enum AvailabilityStoreMessage {
|
|||||||
/// Query an `ErasureChunk` from the AV store.
|
/// Query an `ErasureChunk` from the AV store.
|
||||||
QueryChunk(Hash, ValidatorIndex, oneshot::Sender<Option<ErasureChunk>>),
|
QueryChunk(Hash, ValidatorIndex, oneshot::Sender<Option<ErasureChunk>>),
|
||||||
|
|
||||||
|
/// Query whether an `ErasureChunk` exists within the AV Store.
|
||||||
|
///
|
||||||
|
/// This is useful in cases like bitfield signing, when existence
|
||||||
|
/// matters, but we don't want to necessarily pass around large
|
||||||
|
/// quantities of data to get a single bit of information.
|
||||||
|
QueryChunkAvailability(Hash, ValidatorIndex, oneshot::Sender<bool>),
|
||||||
|
|
||||||
/// Store an `ErasureChunk` in the AV store.
|
/// Store an `ErasureChunk` in the AV store.
|
||||||
///
|
///
|
||||||
/// Return `Ok(())` if the store operation succeeded, `Err(())` if it failed.
|
/// Return `Ok(())` if the store operation succeeded, `Err(())` if it failed.
|
||||||
@@ -269,6 +276,7 @@ impl AvailabilityStoreMessage {
|
|||||||
Self::QueryAvailableData(hash, _) => Some(*hash),
|
Self::QueryAvailableData(hash, _) => Some(*hash),
|
||||||
Self::QueryDataAvailability(hash, _) => Some(*hash),
|
Self::QueryDataAvailability(hash, _) => Some(*hash),
|
||||||
Self::QueryChunk(hash, _, _) => Some(*hash),
|
Self::QueryChunk(hash, _, _) => Some(*hash),
|
||||||
|
Self::QueryChunkAvailability(hash, _, _) => Some(*hash),
|
||||||
Self::StoreChunk(hash, _, _, _) => Some(*hash),
|
Self::StoreChunk(hash, _, _, _) => Some(*hash),
|
||||||
Self::StoreAvailableData(hash, _, _, _, _) => Some(*hash),
|
Self::StoreAvailableData(hash, _, _, _, _) => Some(*hash),
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user