diff --git a/polkadot/node/core/av-store/src/lib.rs b/polkadot/node/core/av-store/src/lib.rs index 386ff99f27..14f027dc86 100644 --- a/polkadot/node/core/av-store/src/lib.rs +++ b/polkadot/node/core/av-store/src/lib.rs @@ -149,16 +149,14 @@ fn process_message(db: &Arc, msg: AvailabilityStoreMessage) -> R tx.send(available_data(db, &hash).map(|d| d.data)).map_err(|_| oneshot::Canceled)?; } QueryDataAvailability(hash, tx) => { - let result = match available_data(db, &hash) { - Some(_) => true, - None => false, - }; - - tx.send(result).map_err(|_| oneshot::Canceled)?; + tx.send(available_data(db, &hash).is_some()).map_err(|_| oneshot::Canceled)?; } QueryChunk(hash, id, tx) => { tx.send(get_chunk(db, &hash, id)?).map_err(|_| oneshot::Canceled)?; } + QueryChunkAvailability(hash, id, tx) => { + tx.send(get_chunk(db, &hash, id)?.is_some()).map_err(|_| oneshot::Canceled)?; + } StoreChunk(hash, id, chunk, tx) => { match store_chunk(db, &hash, id, chunk) { Err(e) => { @@ -394,7 +392,7 @@ mod tests { let chunk_msg = AvailabilityStoreMessage::StoreChunk( relay_parent, - validator_index, + validator_index, chunk.clone(), tx, ); @@ -436,7 +434,7 @@ mod tests { global_validation, local_validation, }; - + let available_data = AvailableData { pov, omitted_validation, diff --git a/polkadot/node/core/bitfield-signing/src/lib.rs b/polkadot/node/core/bitfield-signing/src/lib.rs index 601f0b41b9..e3bb68cec9 100644 --- a/polkadot/node/core/bitfield-signing/src/lib.rs +++ b/polkadot/node/core/bitfield-signing/src/lib.rs @@ -26,11 +26,11 @@ use keystore::KeyStorePtr; use polkadot_node_subsystem::{ messages::{ self, AllMessages, AvailabilityStoreMessage, BitfieldDistributionMessage, - BitfieldSigningMessage, CandidateBackingMessage, RuntimeApiMessage, + BitfieldSigningMessage, CandidateBackingMessage, RuntimeApiMessage, RuntimeApiError }, util::{self, JobManager, JobTrait, ToJobTrait, Validator}, }; -use polkadot_primitives::v1::{AvailabilityBitfield, CoreState, Hash}; +use polkadot_primitives::v1::{AvailabilityBitfield, CoreState, Hash, ValidatorIndex}; use std::{convert::TryFrom, pin::Pin, time::Duration}; use wasm_timer::{Delay, Instant}; @@ -125,16 +125,21 @@ pub enum Error { /// several errors collected into one #[from] Multiple(Vec), + /// the runtime API failed to return what we wanted + #[from] + Runtime(RuntimeApiError), } -// this function exists mainly to collect a bunch of potential error points into one. +// if there is a candidate pending availability, query the Availability Store +// for whether we have the availability chunk for our validator index. async fn get_core_availability( relay_parent: Hash, core: CoreState, + validator_idx: ValidatorIndex, sender: &mpsc::Sender, ) -> Result { use messages::{ - AvailabilityStoreMessage::QueryDataAvailability, + AvailabilityStoreMessage::QueryChunkAvailability, RuntimeApiRequest::CandidatePendingAvailability, }; use FromJob::{AvailabilityStore, RuntimeApi}; @@ -163,8 +168,9 @@ async fn get_core_availability( }; let (tx, rx) = oneshot::channel(); sender - .send(AvailabilityStore(QueryDataAvailability( + .send(AvailabilityStore(QueryChunkAvailability( committed_candidate_receipt.descriptor.pov_hash, + validator_idx, tx, ))) .await?; @@ -173,43 +179,42 @@ async fn get_core_availability( Ok(false) } -// the way this function works is not intuitive: -// -// - get the availability cores so we have a list of cores, in order. -// - for each occupied core, fetch `candidate_pending_availability` from runtime -// - from there, we can get the `CandidateDescriptor` -// - from there, we can send a `AvailabilityStore::QueryPoV` and set the indexed bit to 1 if it returns Some(_) +// delegates to the v1 runtime API +async fn get_availability_cores(relay_parent: Hash, sender: &mut mpsc::Sender) -> Result, Error> { + use FromJob::RuntimeApi; + use messages::{ + RuntimeApiMessage::Request, + RuntimeApiRequest::AvailabilityCores, + }; + + let (tx, rx) = oneshot::channel(); + sender.send(RuntimeApi(Request(relay_parent, AvailabilityCores(tx)))).await?; + match rx.await { + Ok(Ok(out)) => Ok(out), + Ok(Err(runtime_err)) => Err(runtime_err.into()), + Err(err) => Err(err.into()) + } +} + +// - get the list of core states from the runtime +// - for each core, concurrently determine chunk availability (see `get_core_availability`) +// - return the bitfield if there were no errors at any point in this process +// (otherwise, it's prone to false negatives) async fn construct_availability_bitfield( relay_parent: Hash, + validator_idx: ValidatorIndex, sender: &mut mpsc::Sender, -) -> Result, Error> { +) -> Result { use futures::lock::Mutex; - use messages::RuntimeApiRequest::AvailabilityCores; - use FromJob::RuntimeApi; - use RuntimeApiMessage::Request; - - // request the availability cores metadata from runtime. - let (tx, rx) = oneshot::channel(); - sender - .send(RuntimeApi(Request(relay_parent, AvailabilityCores(tx)))) - .await?; + // get the set of availability cores from the runtime + let availability_cores = get_availability_cores(relay_parent, sender).await?; // we now need sender to be immutable so we can copy the reference to multiple concurrent closures let sender = &*sender; - // wait for the cores - let availability_cores = match rx.await? { - Ok(a) => a, - Err(e) => { - log::warn!(target: "bitfield_signing", "Encountered a runtime API error: {:?}", e); - return Ok(None); - } - }; - // prepare outputs - let out = - Mutex::new(bitvec!(bitvec::order::Lsb0, u8; 0; availability_cores.len())); + let out = Mutex::new(bitvec!(bitvec::order::Lsb0, u8; 0; availability_cores.len())); // in principle, we know that we never want concurrent access to the _same_ bit within the vec; // we could `let out_ref = out.as_mut_ptr();` here instead, and manually assign bits, avoiding // any need to ever wait to lock this mutex. @@ -225,7 +230,7 @@ async fn construct_availability_bitfield( // we need the mutexes and explicit references above. stream::iter(availability_cores.into_iter().enumerate()) .for_each_concurrent(None, |(idx, core)| async move { - let availability = match get_core_availability(relay_parent, core, sender).await { + let availability = match get_core_availability(relay_parent, core, validator_idx, sender).await { Ok(availability) => availability, Err(err) => { errs_ref.lock().await.push(err); @@ -238,7 +243,7 @@ async fn construct_availability_bitfield( let errs = errs.into_inner(); if errs.is_empty() { - Ok(Some(out.into_inner().into())) + Ok(out.into_inner().into()) } else { Err(errs.into()) } @@ -275,10 +280,15 @@ impl JobTrait for BitfieldSigningJob { Delay::new_at(wait_until).await?; let bitfield = - match construct_availability_bitfield(relay_parent, &mut sender).await? + match construct_availability_bitfield(relay_parent, validator.index(), &mut sender).await { - None => return Ok(()), - Some(b) => b, + Err(Error::Runtime(runtime_err)) => { + // Don't take down the node on runtime API errors. + log::warn!(target: "bitfield_signing", "Encountered a runtime API error: {:?}", runtime_err); + return Ok(()); + } + Err(err) => return Err(err), + Ok(bitfield) => bitfield, }; let signed_bitfield = validator.sign(bitfield); diff --git a/polkadot/node/subsystem/src/messages.rs b/polkadot/node/subsystem/src/messages.rs index d6bd4f45d8..895e936d5b 100644 --- a/polkadot/node/subsystem/src/messages.rs +++ b/polkadot/node/subsystem/src/messages.rs @@ -242,7 +242,7 @@ pub enum AvailabilityStoreMessage { /// Query whether a `AvailableData` exists within the AV Store. /// - /// This is useful in cases like bitfield signing, when existence + /// This is useful in cases when existence /// matters, but we don't want to necessarily pass around multiple /// megabytes of data to get a single bit of information. QueryDataAvailability(Hash, oneshot::Sender), @@ -250,6 +250,13 @@ pub enum AvailabilityStoreMessage { /// Query an `ErasureChunk` from the AV store. QueryChunk(Hash, ValidatorIndex, oneshot::Sender>), + /// Query whether an `ErasureChunk` exists within the AV Store. + /// + /// This is useful in cases like bitfield signing, when existence + /// matters, but we don't want to necessarily pass around large + /// quantities of data to get a single bit of information. + QueryChunkAvailability(Hash, ValidatorIndex, oneshot::Sender), + /// Store an `ErasureChunk` in the AV store. /// /// Return `Ok(())` if the store operation succeeded, `Err(())` if it failed. @@ -269,6 +276,7 @@ impl AvailabilityStoreMessage { Self::QueryAvailableData(hash, _) => Some(*hash), Self::QueryDataAvailability(hash, _) => Some(*hash), Self::QueryChunk(hash, _, _) => Some(*hash), + Self::QueryChunkAvailability(hash, _, _) => Some(*hash), Self::StoreChunk(hash, _, _, _) => Some(*hash), Self::StoreAvailableData(hash, _, _, _, _) => Some(*hash), }