diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index d49b76de1e..56bfc007d3 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -5976,6 +5976,7 @@ dependencies = [ "futures-timer 3.0.2", "kvdb", "kvdb-memorydb", + "lru", "maplit", "merlin", "parity-scale-codec", diff --git a/polkadot/node/core/approval-voting/Cargo.toml b/polkadot/node/core/approval-voting/Cargo.toml index 047f3bcafa..104a52621e 100644 --- a/polkadot/node/core/approval-voting/Cargo.toml +++ b/polkadot/node/core/approval-voting/Cargo.toml @@ -10,6 +10,7 @@ futures-timer = "3.0.2" parity-scale-codec = { version = "2.0.0", default-features = false, features = ["bit-vec", "derive"] } tracing = "0.1.26" bitvec = { version = "0.20.1", default-features = false, features = ["alloc"] } +lru = "0.6" merlin = "2.0" schnorrkel = "0.9.1" kvdb = "0.9.0" diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs index e219671920..54943c40b6 100644 --- a/polkadot/node/core/approval-voting/src/lib.rs +++ b/polkadot/node/core/approval-voting/src/lib.rs @@ -25,24 +25,25 @@ use polkadot_node_subsystem::{ messages::{ AssignmentCheckError, AssignmentCheckResult, ApprovalCheckError, ApprovalCheckResult, ApprovalVotingMessage, RuntimeApiMessage, RuntimeApiRequest, ChainApiMessage, - ApprovalDistributionMessage, ValidationFailed, CandidateValidationMessage, + ApprovalDistributionMessage, CandidateValidationMessage, AvailabilityRecoveryMessage, }, errors::RecoveryError, Subsystem, SubsystemContext, SubsystemError, SubsystemResult, SpawnedSubsystem, - FromOverseer, OverseerSignal, + FromOverseer, OverseerSignal, SubsystemSender, }; use polkadot_node_subsystem_util::{ + TimeoutExt, metrics::{self, prometheus}, rolling_session_window::RollingSessionWindow, }; use polkadot_primitives::v1::{ ValidatorIndex, Hash, SessionIndex, SessionInfo, CandidateHash, - CandidateReceipt, BlockNumber, PersistedValidationData, - ValidationCode, CandidateDescriptor, ValidatorPair, ValidatorSignature, ValidatorId, + CandidateReceipt, BlockNumber, + ValidatorPair, ValidatorSignature, ValidatorId, CandidateIndex, GroupIndex, ApprovalVote, }; -use polkadot_node_primitives::{ValidationResult, PoV}; +use polkadot_node_primitives::ValidationResult; use polkadot_node_primitives::approval::{ IndirectAssignmentCert, IndirectSignedApprovalVote, DelayTranche, BlockApprovalMeta, }; @@ -55,12 +56,14 @@ use sp_application_crypto::Pair; use kvdb::KeyValueDB; use futures::prelude::*; -use futures::future::RemoteHandle; -use futures::channel::{mpsc, oneshot}; +use futures::future::{BoxFuture, RemoteHandle}; +use futures::channel::oneshot; +use futures::stream::FuturesUnordered; use std::collections::{BTreeMap, HashMap, HashSet}; use std::collections::btree_map::Entry; use std::sync::Arc; +use std::time::Duration; use approval_checking::RequiredTranches; use persisted_entries::{ApprovalEntry, CandidateEntry, BlockEntry}; @@ -80,6 +83,8 @@ use crate::approval_db::v1::Config as DatabaseConfig; mod tests; const APPROVAL_SESSIONS: SessionIndex = 6; +const APPROVAL_CHECKING_TIMEOUT: Duration = Duration::from_secs(120); +const APPROVAL_CACHE_SIZE: usize = 1024; const LOG_TARGET: &str = "parachain::approval-voting"; /// Configuration for the approval voting subsystem @@ -341,21 +346,10 @@ impl Subsystem for ApprovalVotingSubsystem } } -enum BackgroundRequest { - ApprovalVote(ApprovalVoteRequest), - CandidateValidation( - PersistedValidationData, - ValidationCode, - CandidateDescriptor, - Arc, - oneshot::Sender>, - ), -} - +#[derive(Debug, Clone)] struct ApprovalVoteRequest { validator_index: ValidatorIndex, block_hash: Hash, - candidate_index: usize, } #[derive(Default)] @@ -539,6 +533,109 @@ struct ApprovalStatus { block_tick: Tick, } +#[derive(Copy, Clone)] +enum ApprovalOutcome { + Approved, + Failed, + TimedOut, +} + +struct ApprovalState { + validator_index: ValidatorIndex, + candidate_hash: CandidateHash, + approval_outcome: ApprovalOutcome, +} + +impl ApprovalState { + fn approved( + validator_index: ValidatorIndex, + candidate_hash: CandidateHash, + ) -> Self { + Self { + validator_index, + candidate_hash, + approval_outcome: ApprovalOutcome::Approved, + } + } + fn failed( + validator_index: ValidatorIndex, + candidate_hash: CandidateHash, + ) -> Self { + Self { + validator_index, + candidate_hash, + approval_outcome: ApprovalOutcome::Failed, + } + } +} + +struct CurrentlyCheckingSet { + candidate_hash_map: HashMap>, + currently_checking: FuturesUnordered>, +} + +impl Default for CurrentlyCheckingSet { + fn default() -> Self { + Self { + candidate_hash_map: HashMap::new(), + currently_checking: FuturesUnordered::new(), + } + } +} + +impl CurrentlyCheckingSet { + // This function will lazily launch approval voting work whenever the + // candidate is not already undergoing validation. + pub async fn insert_relay_block_hash( + &mut self, + candidate_hash: CandidateHash, + validator_index: ValidatorIndex, + relay_block: Hash, + launch_work: impl Future>>, + ) -> SubsystemResult<()> { + let val = self.candidate_hash_map + .entry(candidate_hash) + .or_insert(Default::default()); + + if let Err(k) = val.binary_search_by_key(&relay_block, |v| *v) { + let _ = val.insert(k, relay_block); + let work = launch_work.await?; + self.currently_checking.push( + Box::pin(async move { + match work.timeout(APPROVAL_CHECKING_TIMEOUT).await { + None => ApprovalState { + candidate_hash, + validator_index, + approval_outcome: ApprovalOutcome::TimedOut, + }, + Some(approval_state) => approval_state, + } + }) + ); + } + + Ok(()) + } + + pub async fn next( + &mut self, + approvals_cache: &mut lru::LruCache, + ) -> (Vec, ApprovalState) { + if !self.currently_checking.is_empty() { + if let Some(approval_state) = self.currently_checking + .next() + .await + { + let out = self.candidate_hash_map.remove(&approval_state.candidate_hash).unwrap_or_default(); + approvals_cache.put(approval_state.candidate_hash.clone(), approval_state.approval_outcome.clone()); + return (out, approval_state); + } + } + + future::pending().await + } +} + struct State { session_window: RollingSessionWindow, keystore: Arc, @@ -600,7 +697,7 @@ impl State { } } -#[derive(Debug)] +#[derive(Debug, Clone)] enum Action { ScheduleWakeup { block_hash: Hash, @@ -611,20 +708,20 @@ enum Action { WriteBlockEntry(BlockEntry), WriteCandidateEntry(CandidateHash, CandidateEntry), LaunchApproval { + candidate_hash: CandidateHash, indirect_cert: IndirectAssignmentCert, assignment_tranche: DelayTranche, - relay_block_number: BlockNumber, + relay_block_hash: Hash, candidate_index: CandidateIndex, session: SessionIndex, candidate: CandidateReceipt, backing_group: GroupIndex, }, + IssueApproval(CandidateHash, ApprovalVoteRequest), BecomeActive, Conclude, } -type BackgroundTaskMap = BTreeMap>>; - async fn run( mut ctx: C, mut subsystem: ApprovalVotingSubsystem, @@ -633,7 +730,6 @@ async fn run( ) -> SubsystemResult<()> where C: SubsystemContext { - let (background_tx, background_rx) = mpsc::channel::(64); let mut state = State { session_window: RollingSessionWindow::new(APPROVAL_SESSIONS), keystore: subsystem.keystore, @@ -644,12 +740,10 @@ async fn run( }; let mut wakeups = Wakeups::default(); - - // map block numbers to background work. - let mut background_tasks = BTreeMap::new(); + let mut currently_checking_set = CurrentlyCheckingSet::default(); + let mut approvals_cache = lru::LruCache::new(APPROVAL_CACHE_SIZE); let mut last_finalized_height: Option = None; - let mut background_rx = background_rx.fuse(); let db_writer = &*subsystem.db; @@ -676,10 +770,6 @@ async fn run( &mut wakeups, ).await?; - if let Some(finalized_height) = last_finalized_height { - cleanup_background_tasks(finalized_height, &mut background_tasks); - } - if let Mode::Syncing(ref mut oracle) = subsystem.mode { if !oracle.is_major_syncing() { // note that we're active before processing other actions. @@ -689,28 +779,46 @@ async fn run( actions } - background_request = background_rx.next().fuse() => { - if let Some(req) = background_request { - handle_background_request( - &mut ctx, - &mut state, - &subsystem.metrics, - req, - ).await? - } else { - Vec::new() + approval_state = currently_checking_set.next(&mut approvals_cache).fuse() => { + let mut actions = Vec::new(); + let ( + relay_block_hashes, + ApprovalState { + validator_index, + candidate_hash, + approval_outcome, + } + ) = approval_state; + + if matches!(approval_outcome, ApprovalOutcome::Approved) { + let mut approvals: Vec = relay_block_hashes + .into_iter() + .map(|block_hash| + Action::IssueApproval( + candidate_hash, + ApprovalVoteRequest { + validator_index, + block_hash, + }, + ) + ) + .collect(); + actions.append(&mut approvals); } + + actions } }; if handle_actions( &mut ctx, + &mut state, &subsystem.metrics, &mut wakeups, + &mut currently_checking_set, + &mut approvals_cache, db_writer, subsystem.db_config, - &background_tx, - &mut background_tasks, &mut subsystem.mode, actions, ).await? { @@ -721,22 +829,42 @@ async fn run( Ok(()) } +// Handle actions is a function that accepts a set of instructions +// and subsequently updates the underlying approvals_db in accordance +// with the linear set of instructions passed in. Therefore, actions +// must be processed in series to ensure that earlier actions are not +// negated/corrupted by later actions being executed out-of-order. +// +// However, certain Actions can cause additional actions to need to be +// processed by this function. In order to preserve linearity, we would +// need to handle these newly generated actions before we finalize +// completing additional actions in the submitted sequence of actions. +// +// Since recursive async functions are not not stable yet, we are +// forced to modify the actions iterator on the fly whenever a new set +// of actions are generated by handling a single action. +// +// This particular problem statement is specified in issue 3311: +// https://github.com/paritytech/polkadot/issues/3311 +// // returns `true` if any of the actions was a `Conclude` command. async fn handle_actions( ctx: &mut impl SubsystemContext, + state: &mut State, metrics: &Metrics, wakeups: &mut Wakeups, + currently_checking_set: &mut CurrentlyCheckingSet, + approvals_cache: &mut lru::LruCache, db: &dyn KeyValueDB, db_config: DatabaseConfig, - background_tx: &mpsc::Sender, - background_tasks: &mut BackgroundTaskMap, mode: &mut Mode, - actions: impl IntoIterator, + actions: Vec, ) -> SubsystemResult { let mut transaction = approval_db::v1::Transaction::new(db_config); let mut conclude = false; - for action in actions { + let mut actions_iter = actions.into_iter(); + while let Some(action) = actions_iter.next() { match action { Action::ScheduleWakeup { block_hash, @@ -752,10 +880,33 @@ async fn handle_actions( Action::WriteCandidateEntry(candidate_hash, candidate_entry) => { transaction.put_candidate_entry(candidate_hash, candidate_entry.into()); } + Action::IssueApproval(candidate_hash, approval_request) => { + let mut sender = ctx.sender().clone(); + // Note that the IssueApproval action will create additional + // actions that will need to all be processed before we can + // handle the next action in the set passed to the ambient + // function. + // + // In order to achieve this, we append the existing iterator + // to the end of the iterator made up of these newly generated + // actions. + // + // Note that chaining these iterators is O(n) as we must consume + // the prior iterator. + let next_actions: Vec = issue_approval( + &mut sender, + state, + metrics, + candidate_hash, + approval_request, + )?.into_iter().map(|v| v.clone()).chain(actions_iter).collect(); + actions_iter = next_actions.into_iter(); + } Action::LaunchApproval { + candidate_hash, indirect_cert, assignment_tranche, - relay_block_number, + relay_block_hash, candidate_index, session, candidate, @@ -773,20 +924,42 @@ async fn handle_actions( candidate_index, ).into()); - let handle = launch_approval( - ctx, - metrics.clone(), - background_tx.clone(), - session, - &candidate, - validator_index, - block_hash, - candidate_index as _, - backing_group, - ).await?; - - if let Some(handle) = handle { - background_tasks.entry(relay_block_number).or_default().push(handle); + match approvals_cache.get(&candidate_hash) { + Some(ApprovalOutcome::Approved) => { + let new_actions: Vec = std::iter::once( + Action::IssueApproval( + candidate_hash, + ApprovalVoteRequest { + validator_index, + block_hash, + } + ) + ) + .map(|v| v.clone()) + .chain(actions_iter) + .collect(); + actions_iter = new_actions.into_iter(); + }, + None => { + let ctx = &mut *ctx; + currently_checking_set.insert_relay_block_hash( + candidate_hash, + validator_index, + relay_block_hash, + async move { + launch_approval( + ctx, + metrics.clone(), + session, + candidate, + validator_index, + block_hash, + backing_group, + ).await + } + ).await?; + } + Some(_) => {}, } } Action::BecomeActive => { @@ -812,19 +985,6 @@ async fn handle_actions( Ok(conclude) } -// Clean up all background tasks which are no longer needed as they correspond to a -// finalized block. -fn cleanup_background_tasks( - current_finalized_block: BlockNumber, - tasks: &mut BackgroundTaskMap, -) { - let after = tasks.split_off(&(current_finalized_block + 1)); - *tasks = after; - - // tasks up to the finalized block are dropped, and `RemoteHandle` cancels - // the task on drop. -} - fn distribution_messages_for_activation<'a>( db: impl DBReader + 'a, ) -> SubsystemResult> { @@ -1037,36 +1197,6 @@ async fn handle_from_overseer( Ok(actions) } -async fn handle_background_request( - ctx: &mut impl SubsystemContext, - state: &State, - metrics: &Metrics, - request: BackgroundRequest, -) -> SubsystemResult> { - match request { - BackgroundRequest::ApprovalVote(vote_request) => { - issue_approval(ctx, state, metrics, vote_request) - } - BackgroundRequest::CandidateValidation( - validation_data, - validation_code, - descriptor, - pov, - tx, - ) => { - ctx.send_message(CandidateValidationMessage::ValidateFromExhaustive( - validation_data, - validation_code, - descriptor, - pov, - tx, - ).into()).await; - - Ok(Vec::new()) - } - } -} - async fn handle_approved_ancestor( ctx: &mut impl SubsystemContext, db: &impl DBReader, @@ -1882,9 +2012,10 @@ fn process_wakeup( // sanity: should always be present. actions.push(Action::LaunchApproval { + candidate_hash, indirect_cert, assignment_tranche: tranche, - relay_block_number: block_entry.block_number(), + relay_block_hash: relay_block, candidate_index: i as _, session: block_entry.session(), candidate: candidate_entry.candidate_receipt().clone(), @@ -1925,14 +2056,12 @@ fn process_wakeup( async fn launch_approval( ctx: &mut impl SubsystemContext, metrics: Metrics, - mut background_tx: mpsc::Sender, session_index: SessionIndex, - candidate: &CandidateReceipt, + candidate: CandidateReceipt, validator_index: ValidatorIndex, block_hash: Hash, - candidate_index: usize, backing_group: GroupIndex, -) -> SubsystemResult>> { +) -> SubsystemResult> { let (a_tx, a_rx) = oneshot::channel(); let (code_tx, code_rx) = oneshot::channel(); @@ -1988,6 +2117,7 @@ async fn launch_approval( let candidate = candidate.clone(); let metrics_guard = StaleGuard(Some(metrics)); + let mut sender = ctx.sender().clone(); let background = async move { // Force the move of the timer into the background task. let _timer = timer; @@ -1997,35 +2127,52 @@ async fn launch_approval( .with_stage(jaeger::Stage::ApprovalChecking); let available_data = match a_rx.await { - Err(_) => return, + Err(_) => return ApprovalState::failed( + validator_index, + candidate_hash, + ), Ok(Ok(a)) => a, - Ok(Err(RecoveryError::Unavailable)) => { - tracing::warn!( - target: LOG_TARGET, - "Data unavailable for candidate {:?}", - (candidate_hash, candidate.descriptor.para_id), - ); - // do nothing. we'll just be a no-show and that'll cause others to rise up. - metrics_guard.take().on_approval_unavailable(); - return; - } - Ok(Err(RecoveryError::Invalid)) => { - tracing::warn!( - target: LOG_TARGET, - "Data recovery invalid for candidate {:?}", - (candidate_hash, candidate.descriptor.para_id), - ); + Ok(Err(e)) => { + match &e { + &RecoveryError::Unavailable => { + tracing::warn!( + target: LOG_TARGET, + "Data unavailable for candidate {:?}", + (candidate_hash, candidate.descriptor.para_id), + ); + // do nothing. we'll just be a no-show and that'll cause others to rise up. + metrics_guard.take().on_approval_unavailable(); + } + &RecoveryError::Invalid => { + tracing::warn!( + target: LOG_TARGET, + "Data recovery invalid for candidate {:?}", + (candidate_hash, candidate.descriptor.para_id), + ); - // TODO: dispute. Either the merkle trie is bad or the erasure root is. - // https://github.com/paritytech/polkadot/issues/2176 - metrics_guard.take().on_approval_invalid(); - return; + // TODO: dispute. Either the merkle trie is bad or the erasure root is. + // https://github.com/paritytech/polkadot/issues/2176 + metrics_guard.take().on_approval_invalid(); + } + } + return ApprovalState::failed( + validator_index, + candidate_hash, + ); } }; let validation_code = match code_rx.await { - Err(_) => return, - Ok(Err(_)) => return, + Err(_) => + return ApprovalState::failed( + validator_index, + candidate_hash, + ), + Ok(Err(_)) => + return ApprovalState::failed( + validator_index, + candidate_hash, + ), Ok(Ok(Some(code))) => code, Ok(Ok(None)) => { tracing::warn!( @@ -2038,23 +2185,31 @@ async fn launch_approval( // No dispute necessary, as this indicates that the chain is not behaving // according to expectations. metrics_guard.take().on_approval_unavailable(); - return; + return ApprovalState::failed( + validator_index, + candidate_hash, + ); } }; let (val_tx, val_rx) = oneshot::channel(); let para_id = candidate.descriptor.para_id; - let _ = background_tx.send(BackgroundRequest::CandidateValidation( + + sender.send_message(CandidateValidationMessage::ValidateFromExhaustive( available_data.validation_data, validation_code, candidate.descriptor, available_data.pov, val_tx, - )).await; + ).into()).await; match val_rx.await { - Err(_) => return, + Err(_) => + return ApprovalState::failed( + validator_index, + candidate_hash, + ), Ok(Ok(ValidationResult::Valid(_, _))) => { // Validation checked out. Issue an approval command. If the underlying service is unreachable, // then there isn't anything we can do. @@ -2067,11 +2222,10 @@ async fn launch_approval( ); let _ = metrics_guard.take(); - let _ = background_tx.send(BackgroundRequest::ApprovalVote(ApprovalVoteRequest { + return ApprovalState::approved( validator_index, - block_hash, - candidate_index, - })).await; + candidate_hash, + ); } Ok(Ok(ValidationResult::Invalid(reason))) => { tracing::warn!( @@ -2085,6 +2239,11 @@ async fn launch_approval( // TODO: issue dispute, but not for timeouts. // https://github.com/paritytech/polkadot/issues/2176 metrics_guard.take().on_approval_invalid(); + + return ApprovalState::failed( + validator_index, + candidate_hash, + ); } Ok(Err(e)) => { tracing::error!( @@ -2093,26 +2252,28 @@ async fn launch_approval( "Failed to validate candidate due to internal error", ); metrics_guard.take().on_approval_error(); - return + return ApprovalState::failed( + validator_index, + candidate_hash, + ); } } }; let (background, remote_handle) = background.remote_handle(); ctx.spawn("approval-checks", Box::pin(background)) - .map(move |()| Some(remote_handle)) + .map(move |()| remote_handle) } // Issue and import a local approval vote. Should only be invoked after approval checks // have been done. fn issue_approval( - ctx: &mut impl SubsystemContext, + ctx: &mut impl SubsystemSender, state: &State, metrics: &Metrics, - request: ApprovalVoteRequest, + candidate_hash: CandidateHash, + ApprovalVoteRequest { validator_index, block_hash }: ApprovalVoteRequest, ) -> SubsystemResult> { - let ApprovalVoteRequest { validator_index, block_hash, candidate_index } = request; - let block_entry = match state.db.load_block_entry(&block_hash)? { Some(b) => b, None => { @@ -2122,6 +2283,25 @@ fn issue_approval( } }; + let candidate_index = match block_entry + .candidates() + .iter() + .position(|e| e.1 == candidate_hash) + { + None => { + tracing::warn!( + target: LOG_TARGET, + "Candidate hash {} is not present in the block entry's candidates for relay block {}", + candidate_hash, + block_entry.parent_hash(), + ); + + metrics.on_approval_error(); + return Ok(Vec::new()); + } + Some(idx) => idx, + }; + let session_info = match state.session_info(block_entry.session()) { Some(s) => s, None => { @@ -2137,7 +2317,7 @@ fn issue_approval( } }; - let candidate_hash = match block_entry.candidate(candidate_index) { + let candidate_hash = match block_entry.candidate(candidate_index as usize) { Some((_, h)) => h.clone(), None => { tracing::warn!(