From c418758ebc1f23869b9540dcb9a0538c7b608907 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Thu, 5 Nov 2020 11:56:52 +0100 Subject: [PATCH] Simplify the bitfield signing job (#1920) Besides that the pr also adds a simple test. --- polkadot/Cargo.lock | 2 +- .../node/core/bitfield-signing/Cargo.toml | 2 +- .../node/core/bitfield-signing/src/lib.rs | 216 ++++++++++-------- 3 files changed, 118 insertions(+), 102 deletions(-) diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 5bb4d4f5e7..21944abf45 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -5000,7 +5000,7 @@ dependencies = [ name = "polkadot-node-core-bitfield-signing" version = "0.1.0" dependencies = [ - "bitvec", + "derive_more", "futures 0.3.5", "log 0.4.11", "polkadot-node-subsystem", diff --git a/polkadot/node/core/bitfield-signing/Cargo.toml b/polkadot/node/core/bitfield-signing/Cargo.toml index 1ad319617a..ef778186a4 100644 --- a/polkadot/node/core/bitfield-signing/Cargo.toml +++ b/polkadot/node/core/bitfield-signing/Cargo.toml @@ -5,7 +5,6 @@ authors = ["Parity Technologies "] edition = "2018" [dependencies] -bitvec = "0.17.4" futures = "0.3.5" log = "0.4.11" polkadot-primitives = { path = "../../../primitives" } @@ -14,3 +13,4 @@ polkadot-node-subsystem-util = { path = "../../subsystem-util" } sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" } wasm-timer = "0.2.4" thiserror = "1.0.21" +derive_more = "0.99.11" diff --git a/polkadot/node/core/bitfield-signing/src/lib.rs b/polkadot/node/core/bitfield-signing/src/lib.rs index 48bc8df70a..137fa4dad2 100644 --- a/polkadot/node/core/bitfield-signing/src/lib.rs +++ b/polkadot/node/core/bitfield-signing/src/lib.rs @@ -16,20 +16,16 @@ //! The bitfield signing subsystem produces `SignedAvailabilityBitfield`s once per block. -#![deny(unused_crate_dependencies, unused_results)] +#![deny(unused_crate_dependencies)] #![warn(missing_docs)] +#![recursion_limit="256"] -use bitvec::bitvec; -use futures::{ - channel::{mpsc, oneshot}, - prelude::*, - stream, Future, -}; +use futures::{channel::{mpsc, oneshot}, lock::Mutex, prelude::*, future, Future}; use sp_keystore::{Error as KeystoreError, SyncCryptoStorePtr}; use polkadot_node_subsystem::{ messages::{ - self, AllMessages, AvailabilityStoreMessage, BitfieldDistributionMessage, - BitfieldSigningMessage, CandidateBackingMessage, RuntimeApiMessage, + AllMessages, AvailabilityStoreMessage, BitfieldDistributionMessage, + BitfieldSigningMessage, CandidateBackingMessage, RuntimeApiMessage, RuntimeApiRequest, }, errors::RuntimeApiError, }; @@ -38,7 +34,7 @@ use polkadot_node_subsystem_util::{ metrics::{self, prometheus}, }; use polkadot_primitives::v1::{AvailabilityBitfield, CoreState, Hash, ValidatorIndex}; -use std::{convert::TryFrom, pin::Pin, time::Duration}; +use std::{convert::TryFrom, pin::Pin, time::Duration, iter::FromIterator}; use wasm_timer::{Delay, Instant}; use thiserror::Error; @@ -85,6 +81,7 @@ impl From for ToJob { /// Messages which may be sent from a `BitfieldSigningJob`. #[allow(missing_docs)] +#[derive(Debug, derive_more::From)] pub enum FromJob { AvailabilityStore(AvailabilityStoreMessage), BitfieldDistribution(BitfieldDistributionMessage), @@ -132,9 +129,6 @@ pub enum Error { /// a mspc channel failed to send #[error(transparent)] MpscSend(#[from] mpsc::SendError), - /// several errors collected into one - #[error("Multiple errours occured: {0:?}")] - Multiple(Vec), /// the runtime API failed to return what we wanted #[error(transparent)] Runtime(#[from] RuntimeApiError), @@ -143,31 +137,25 @@ pub enum Error { Keystore(KeystoreError), } -// if there is a candidate pending availability, query the Availability Store -// for whether we have the availability chunk for our validator index. +/// If there is a candidate pending availability, query the Availability Store +/// for whether we have the availability chunk for our validator index. async fn get_core_availability( relay_parent: Hash, core: CoreState, validator_idx: ValidatorIndex, - sender: &mpsc::Sender, + sender: &Mutex<&mut mpsc::Sender>, ) -> Result { - use messages::{ - AvailabilityStoreMessage::QueryChunkAvailability, - RuntimeApiRequest::CandidatePendingAvailability, - }; - use FromJob::{AvailabilityStore, RuntimeApi}; - use RuntimeApiMessage::Request; - - // we have to (cheaply) clone this sender so we can mutate it to actually send anything - let mut sender = sender.clone(); - if let CoreState::Occupied(core) = core { let (tx, rx) = oneshot::channel(); sender - .send(RuntimeApi(Request( - relay_parent, - CandidatePendingAvailability(core.para_id, tx), - ))) + .lock() + .await + .send( + RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::CandidatePendingAvailability(core.para_id, tx), + ).into(), + ) .await?; let committed_candidate_receipt = match rx.await? { @@ -181,27 +169,26 @@ async fn get_core_availability( }; let (tx, rx) = oneshot::channel(); sender - .send(AvailabilityStore(QueryChunkAvailability( - committed_candidate_receipt.descriptor.pov_hash, - validator_idx, - tx, - ))) + .lock() + .await + .send( + AvailabilityStoreMessage::QueryChunkAvailability( + committed_candidate_receipt.descriptor.pov_hash, + validator_idx, + tx, + ).into(), + ) .await?; return rx.await.map_err(Into::into); } + Ok(false) } -// delegates to the v1 runtime API +/// delegates to the v1 runtime API async fn get_availability_cores(relay_parent: Hash, sender: &mut mpsc::Sender) -> Result, Error> { - use FromJob::RuntimeApi; - use messages::{ - RuntimeApiMessage::Request, - RuntimeApiRequest::AvailabilityCores, - }; - let (tx, rx) = oneshot::channel(); - sender.send(RuntimeApi(Request(relay_parent, AvailabilityCores(tx)))).await?; + sender.send(RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::AvailabilityCores(tx)).into()).await?; match rx.await { Ok(Ok(out)) => Ok(out), Ok(Err(runtime_err)) => Err(runtime_err.into()), @@ -209,57 +196,28 @@ async fn get_availability_cores(relay_parent: Hash, sender: &mut mpsc::Sender, ) -> Result { - use futures::lock::Mutex; - // get the set of availability cores from the runtime let availability_cores = get_availability_cores(relay_parent, sender).await?; - // we now need sender to be immutable so we can copy the reference to multiple concurrent closures - let sender = &*sender; + // Wrap the sender in a Mutex to share it between the futures. + let sender = Mutex::new(sender); - // prepare outputs - let out = Mutex::new(bitvec!(bitvec::order::Lsb0, u8; 0; availability_cores.len())); - // in principle, we know that we never want concurrent access to the _same_ bit within the vec; - // we could `let out_ref = out.as_mut_ptr();` here instead, and manually assign bits, avoiding - // any need to ever wait to lock this mutex. - // in practice, it's safer to just use the mutex, and speed optimizations should wait until - // benchmarking proves that they are necessary. - let out_ref = &out; - let errs = Mutex::new(Vec::new()); - let errs_ref = &errs; + // Handle all cores concurrently + // `try_join_all` returns all results in the same order as the input futures. + let results = future::try_join_all( + availability_cores.into_iter().map(|core| get_core_availability(relay_parent, core, validator_idx, &sender)), + ).await?; - // Handle each (idx, core) pair concurrently - // - // In principle, this work is all concurrent, not parallel. In practice, we can't guarantee it, which is why - // we need the mutexes and explicit references above. - stream::iter(availability_cores.into_iter().enumerate()) - .for_each_concurrent(None, |(idx, core)| async move { - let availability = match get_core_availability(relay_parent, core, validator_idx, sender).await { - Ok(availability) => availability, - Err(err) => { - errs_ref.lock().await.push(err); - return; - } - }; - out_ref.lock().await.set(idx, availability); - }) - .await; - - let errs = errs.into_inner(); - if errs.is_empty() { - Ok(out.into_inner().into()) - } else { - Err(Error::Multiple(errs.into())) - } + Ok(AvailabilityBitfield(FromIterator::from_iter(results))) } #[derive(Clone)] @@ -312,7 +270,6 @@ impl JobTrait for BitfieldSigningJob { mut sender: mpsc::Sender, ) -> Pin> + Send>> { async move { - // figure out when to wait to let wait_until = Instant::now() + JOB_DELAY; // now do all the work we can before we need to wait for the availability store @@ -344,24 +301,83 @@ impl JobTrait for BitfieldSigningJob { .map_err(|e| Error::Keystore(e))?; metrics.on_bitfield_signed(); - // make an anonymous scope to contain some use statements to simplify creating the outbound message - { - use BitfieldDistributionMessage::DistributeBitfield; - use FromJob::BitfieldDistribution; - - sender - .send(BitfieldDistribution(DistributeBitfield( - relay_parent, - signed_bitfield, - ))) - .await - .map_err(Into::into) - } + sender + .send(BitfieldDistributionMessage::DistributeBitfield(relay_parent, signed_bitfield).into()) + .await + .map_err(Into::into) } .boxed() } } /// BitfieldSigningSubsystem manages a number of bitfield signing jobs. -pub type BitfieldSigningSubsystem = - JobManager; +pub type BitfieldSigningSubsystem = JobManager; + +#[cfg(test)] +mod tests { + use super::*; + use futures::{pin_mut, executor::block_on}; + use polkadot_primitives::v1::{OccupiedCore}; + use FromJob::*; + + fn occupied_core(para_id: u32) -> CoreState { + CoreState::Occupied(OccupiedCore { + para_id: para_id.into(), + group_responsible: para_id.into(), + next_up_on_available: None, + occupied_since: 100_u32, + time_out_at: 200_u32, + next_up_on_time_out: None, + availability: Default::default(), + }) + } + + #[test] + fn construct_availability_bitfield_works() { + block_on(async move { + let (mut sender, mut receiver) = mpsc::channel(10); + let relay_parent = Hash::default(); + let validator_index = 1u32; + + let future = construct_availability_bitfield(relay_parent, validator_index, &mut sender).fuse(); + pin_mut!(future); + + loop { + futures::select! { + m = receiver.next() => match m.unwrap() { + RuntimeApi(RuntimeApiMessage::Request(rp, RuntimeApiRequest::AvailabilityCores(tx))) => { + assert_eq!(relay_parent, rp); + tx.send(Ok(vec![CoreState::Free, occupied_core(1), occupied_core(2)])).unwrap(); + }, + RuntimeApi( + RuntimeApiMessage::Request(rp, RuntimeApiRequest::CandidatePendingAvailability(para_id, tx)) + ) => { + assert_eq!(relay_parent, rp); + + if para_id == 1.into() { + tx.send(Ok(Some(Default::default()))).unwrap(); + } else { + tx.send(Ok(None)).unwrap(); + } + }, + AvailabilityStore(AvailabilityStoreMessage::QueryChunkAvailability(_, vidx, tx)) => { + assert_eq!(validator_index, vidx); + + tx.send(true).unwrap(); + }, + o => panic!("Unknown message: {:?}", o), + }, + r = future => match r { + Ok(r) => { + assert!(!r.0.get(0).unwrap()); + assert!(r.0.get(1).unwrap()); + assert!(!r.0.get(2).unwrap()); + break + }, + Err(e) => panic!("Failed: {:?}", e), + }, + } + } + }); + } +}