mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-11 23:31:07 +00:00
Fix currently-checking-cache (#4410)
* alter currently-checking-set to launch work only on new candidates * fmt * fix compilation * address review * Introduce approvals cache test that ensures approval work is only triggered once for each Candidate Hash * Fix formatting * Address Feedback * Move final message await into handle function Co-authored-by: Chris Sosnin <chris125_@live.com> Co-authored-by: Lldenaurois <Ljdenaurois@gmail.com>
This commit is contained in:
committed by
GitHub
parent
30423f796b
commit
ca358e1288
@@ -68,7 +68,9 @@ use futures::{
|
||||
};
|
||||
|
||||
use std::{
|
||||
collections::{btree_map::Entry, BTreeMap, HashMap, HashSet},
|
||||
collections::{
|
||||
btree_map::Entry as BTMEntry, hash_map::Entry as HMEntry, BTreeMap, HashMap, HashSet,
|
||||
},
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
@@ -400,7 +402,7 @@ impl Wakeups {
|
||||
}
|
||||
|
||||
// we are replacing previous wakeup with an earlier one.
|
||||
if let Entry::Occupied(mut entry) = self.wakeups.entry(*prev) {
|
||||
if let BTMEntry::Occupied(mut entry) = self.wakeups.entry(*prev) {
|
||||
if let Some(pos) =
|
||||
entry.get().iter().position(|x| x == &(block_hash, candidate_hash))
|
||||
{
|
||||
@@ -436,7 +438,7 @@ impl Wakeups {
|
||||
});
|
||||
|
||||
for (tick, pruned) in pruned_wakeups {
|
||||
if let Entry::Occupied(mut entry) = self.wakeups.entry(tick) {
|
||||
if let BTMEntry::Occupied(mut entry) = self.wakeups.entry(tick) {
|
||||
entry.get_mut().retain(|wakeup| !pruned.contains(wakeup));
|
||||
if entry.get().is_empty() {
|
||||
let _ = entry.remove();
|
||||
@@ -457,10 +459,10 @@ impl Wakeups {
|
||||
Some(tick) => {
|
||||
clock.wait(tick).await;
|
||||
match self.wakeups.entry(tick) {
|
||||
Entry::Vacant(_) => {
|
||||
BTMEntry::Vacant(_) => {
|
||||
panic!("entry is known to exist since `first` was `Some`; qed")
|
||||
},
|
||||
Entry::Occupied(mut entry) => {
|
||||
BTMEntry::Occupied(mut entry) => {
|
||||
let (hash, candidate_hash) = entry.get_mut().pop()
|
||||
.expect("empty entries are removed here and in `schedule`; no other mutation of this map; qed");
|
||||
|
||||
@@ -507,9 +509,7 @@ impl ApprovalState {
|
||||
}
|
||||
|
||||
struct CurrentlyCheckingSet {
|
||||
/// Invariant: The contained `Vec` needs to stay sorted as we are using `binary_search_by_key`
|
||||
/// on it.
|
||||
candidate_hash_map: HashMap<CandidateHash, Vec<Hash>>,
|
||||
candidate_hash_map: HashMap<CandidateHash, HashSet<Hash>>,
|
||||
currently_checking: FuturesUnordered<BoxFuture<'static, ApprovalState>>,
|
||||
}
|
||||
|
||||
@@ -529,21 +529,26 @@ impl CurrentlyCheckingSet {
|
||||
relay_block: Hash,
|
||||
launch_work: impl Future<Output = SubsystemResult<RemoteHandle<ApprovalState>>>,
|
||||
) -> SubsystemResult<()> {
|
||||
let val = self.candidate_hash_map.entry(candidate_hash).or_insert(Default::default());
|
||||
|
||||
if let Err(k) = val.binary_search_by_key(&relay_block, |v| *v) {
|
||||
let _ = val.insert(k, relay_block);
|
||||
let work = launch_work.await?;
|
||||
self.currently_checking.push(Box::pin(async move {
|
||||
match work.timeout(APPROVAL_CHECKING_TIMEOUT).await {
|
||||
None => ApprovalState {
|
||||
candidate_hash,
|
||||
validator_index,
|
||||
approval_outcome: ApprovalOutcome::TimedOut,
|
||||
},
|
||||
Some(approval_state) => approval_state,
|
||||
}
|
||||
}));
|
||||
match self.candidate_hash_map.entry(candidate_hash) {
|
||||
HMEntry::Occupied(mut entry) => {
|
||||
// validation already undergoing. just add the relay hash if unknown.
|
||||
entry.get_mut().insert(relay_block);
|
||||
},
|
||||
HMEntry::Vacant(entry) => {
|
||||
// validation not ongoing. launch work and time out the remote handle.
|
||||
entry.insert(HashSet::new()).insert(relay_block);
|
||||
let work = launch_work.await?;
|
||||
self.currently_checking.push(Box::pin(async move {
|
||||
match work.timeout(APPROVAL_CHECKING_TIMEOUT).await {
|
||||
None => ApprovalState {
|
||||
candidate_hash,
|
||||
validator_index,
|
||||
approval_outcome: ApprovalOutcome::TimedOut,
|
||||
},
|
||||
Some(approval_state) => approval_state,
|
||||
}
|
||||
}));
|
||||
},
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -552,7 +557,7 @@ impl CurrentlyCheckingSet {
|
||||
pub async fn next(
|
||||
&mut self,
|
||||
approvals_cache: &mut lru::LruCache<CandidateHash, ApprovalOutcome>,
|
||||
) -> (Vec<Hash>, ApprovalState) {
|
||||
) -> (HashSet<Hash>, ApprovalState) {
|
||||
if !self.currently_checking.is_empty() {
|
||||
if let Some(approval_state) = self.currently_checking.next().await {
|
||||
let out = self
|
||||
|
||||
@@ -15,18 +15,25 @@
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use super::*;
|
||||
use polkadot_node_primitives::approval::{
|
||||
AssignmentCert, AssignmentCertKind, DelayTranche, VRFOutput, VRFProof, RELAY_VRF_MODULO_CONTEXT,
|
||||
use polkadot_node_primitives::{
|
||||
approval::{
|
||||
AssignmentCert, AssignmentCertKind, DelayTranche, VRFOutput, VRFProof,
|
||||
RELAY_VRF_MODULO_CONTEXT,
|
||||
},
|
||||
AvailableData, BlockData, PoV,
|
||||
};
|
||||
use polkadot_node_subsystem::{
|
||||
messages::{AllMessages, ApprovalVotingMessage, AssignmentCheckResult},
|
||||
messages::{
|
||||
AllMessages, ApprovalVotingMessage, AssignmentCheckResult, AvailabilityRecoveryMessage,
|
||||
},
|
||||
ActivatedLeaf, ActiveLeavesUpdate, LeafStatus,
|
||||
};
|
||||
use polkadot_node_subsystem_test_helpers as test_helpers;
|
||||
use polkadot_node_subsystem_util::TimeoutExt;
|
||||
use polkadot_overseer::HeadSupportsParachains;
|
||||
use polkadot_primitives::v1::{
|
||||
CandidateEvent, CoreIndex, GroupIndex, Header, Id as ParaId, ValidatorSignature,
|
||||
CandidateCommitments, CandidateEvent, CoreIndex, GroupIndex, Header, Id as ParaId,
|
||||
ValidationCode, ValidatorSignature,
|
||||
};
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -2156,7 +2163,7 @@ fn subsystem_approved_ancestor_missing_approval() {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn subsystem_process_wakeup_trigger_assignment_launch_approval() {
|
||||
fn subsystem_validate_approvals_cache() {
|
||||
let assignment_criteria = Box::new(MockAssignmentCriteria(
|
||||
|| {
|
||||
let mut assignments = HashMap::new();
|
||||
@@ -2186,7 +2193,10 @@ fn subsystem_process_wakeup_trigger_assignment_launch_approval() {
|
||||
} = test_harness;
|
||||
|
||||
let block_hash = Hash::repeat_byte(0x01);
|
||||
let candidate_receipt = dummy_candidate_receipt(block_hash);
|
||||
let fork_block_hash = Hash::repeat_byte(0x02);
|
||||
let candidate_commitments = CandidateCommitments::default();
|
||||
let mut candidate_receipt = dummy_candidate_receipt(block_hash);
|
||||
candidate_receipt.commitments_hash = candidate_commitments.hash();
|
||||
let candidate_hash = candidate_receipt.hash();
|
||||
let slot = Slot::from(1);
|
||||
let candidate_index = 0;
|
||||
@@ -2215,6 +2225,7 @@ fn subsystem_process_wakeup_trigger_assignment_launch_approval() {
|
||||
no_show_slots: 2,
|
||||
};
|
||||
|
||||
let candidates = Some(vec![(candidate_receipt.clone(), CoreIndex(0), GroupIndex(0))]);
|
||||
ChainBuilder::new()
|
||||
.add_block(
|
||||
block_hash,
|
||||
@@ -2222,10 +2233,16 @@ fn subsystem_process_wakeup_trigger_assignment_launch_approval() {
|
||||
1,
|
||||
BlockConfig {
|
||||
slot,
|
||||
candidates: Some(vec![(candidate_receipt, CoreIndex(0), GroupIndex(0))]),
|
||||
session_info: Some(session_info),
|
||||
candidates: candidates.clone(),
|
||||
session_info: Some(session_info.clone()),
|
||||
},
|
||||
)
|
||||
.add_block(
|
||||
fork_block_hash,
|
||||
ChainBuilder::GENESIS_HASH,
|
||||
1,
|
||||
BlockConfig { slot, candidates, session_info: Some(session_info) },
|
||||
)
|
||||
.build(&mut virtual_overseer)
|
||||
.await;
|
||||
|
||||
@@ -2237,19 +2254,8 @@ fn subsystem_process_wakeup_trigger_assignment_launch_approval() {
|
||||
|
||||
futures_timer::Delay::new(Duration::from_millis(200)).await;
|
||||
|
||||
assert!(clock.inner.lock().current_wakeup_is(slot_to_tick(slot + 2)));
|
||||
clock.inner.lock().wakeup_all(slot_to_tick(slot + 2));
|
||||
|
||||
assert_matches!(
|
||||
overseer_recv(&mut virtual_overseer).await,
|
||||
AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment(
|
||||
_,
|
||||
c_index,
|
||||
)) => {
|
||||
assert_eq!(candidate_index, c_index);
|
||||
}
|
||||
);
|
||||
|
||||
assert_eq!(clock.inner.lock().wakeups.len(), 0);
|
||||
|
||||
futures_timer::Delay::new(Duration::from_millis(200)).await;
|
||||
@@ -2259,10 +2265,92 @@ fn subsystem_process_wakeup_trigger_assignment_launch_approval() {
|
||||
candidate_entry.approval_entry(&block_hash).unwrap().our_assignment().unwrap();
|
||||
assert!(our_assignment.triggered());
|
||||
|
||||
// Handle the the next two assignment imports, where only one should trigger approvals work
|
||||
handle_double_assignment_import(&mut virtual_overseer, candidate_index).await;
|
||||
|
||||
virtual_overseer
|
||||
});
|
||||
}
|
||||
|
||||
/// Ensure that when two assignments are imported, only one triggers the Approval Checking work
|
||||
pub async fn handle_double_assignment_import(
|
||||
virtual_overseer: &mut VirtualOverseer,
|
||||
candidate_index: CandidateIndex,
|
||||
) {
|
||||
assert_matches!(
|
||||
overseer_recv(virtual_overseer).await,
|
||||
AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment(
|
||||
_,
|
||||
c_index,
|
||||
)) => {
|
||||
assert_eq!(candidate_index, c_index);
|
||||
}
|
||||
);
|
||||
|
||||
recover_available_data(virtual_overseer).await;
|
||||
fetch_validation_code(virtual_overseer).await;
|
||||
|
||||
let first_message = virtual_overseer.recv().await;
|
||||
let second_message = virtual_overseer.recv().await;
|
||||
|
||||
for msg in vec![first_message, second_message].into_iter() {
|
||||
match msg {
|
||||
AllMessages::ApprovalDistribution(
|
||||
ApprovalDistributionMessage::DistributeAssignment(_, c_index),
|
||||
) => {
|
||||
assert_eq!(candidate_index, c_index);
|
||||
},
|
||||
AllMessages::CandidateValidation(
|
||||
CandidateValidationMessage::ValidateFromExhaustive(_, _, _, _, timeout, tx),
|
||||
) if timeout == APPROVAL_EXECUTION_TIMEOUT => {
|
||||
tx.send(Ok(ValidationResult::Valid(Default::default(), Default::default())))
|
||||
.unwrap();
|
||||
},
|
||||
_ => panic! {},
|
||||
}
|
||||
}
|
||||
|
||||
// Assert that there are no more messages being sent by the subsystem
|
||||
assert!(overseer_recv(virtual_overseer).timeout(TIMEOUT / 2).await.is_none());
|
||||
}
|
||||
|
||||
/// Handles validation code fetch, returns the received relay parent hash.
|
||||
async fn fetch_validation_code(virtual_overseer: &mut VirtualOverseer) -> Hash {
|
||||
let validation_code = ValidationCode(Vec::new());
|
||||
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
hash,
|
||||
RuntimeApiRequest::ValidationCodeByHash(
|
||||
_,
|
||||
tx,
|
||||
)
|
||||
)) => {
|
||||
tx.send(Ok(Some(validation_code))).unwrap();
|
||||
hash
|
||||
},
|
||||
"overseer did not receive runtime API request for validation code",
|
||||
)
|
||||
}
|
||||
|
||||
async fn recover_available_data(virtual_overseer: &mut VirtualOverseer) {
|
||||
let pov_block = PoV { block_data: BlockData(Vec::new()) };
|
||||
|
||||
let available_data =
|
||||
AvailableData { pov: Arc::new(pov_block), validation_data: Default::default() };
|
||||
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::AvailabilityRecovery(
|
||||
AvailabilityRecoveryMessage::RecoverAvailableData(_, _, _, tx)
|
||||
) => {
|
||||
tx.send(Ok(available_data)).unwrap();
|
||||
},
|
||||
"overseer did not receive recover available data message",
|
||||
);
|
||||
}
|
||||
|
||||
struct TriggersAssignmentConfig<F1, F2> {
|
||||
our_assigned_tranche: DelayTranche,
|
||||
assign_validator_tranche: F1,
|
||||
@@ -2445,6 +2533,21 @@ async fn step_until_done(clock: &MockClock) {
|
||||
println!("relevant_ticks: {:?}", relevant_ticks);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn subsystem_process_wakeup_trigger_assignment_launch_approval() {
|
||||
triggers_assignment_test(TriggersAssignmentConfig {
|
||||
our_assigned_tranche: 0,
|
||||
assign_validator_tranche: |_| Ok(0),
|
||||
no_show_slots: 0,
|
||||
assignments_to_import: vec![1],
|
||||
approvals_to_import: vec![1],
|
||||
ticks: vec![
|
||||
10, // Alice wakeup, assignment triggered
|
||||
],
|
||||
should_be_triggered: |_| true,
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn subsystem_assignment_triggered_solo_zero_tranche() {
|
||||
triggers_assignment_test(TriggersAssignmentConfig {
|
||||
|
||||
Reference in New Issue
Block a user