mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-13 04:41:02 +00:00
Update dispute participation on active leaves update (#6303)
* Passed candidate events from scraper to participation * First draft PR 5875 * Added support for timestamp in changes * Some necessary refactoring * Removed SessionIndex from unconfirmed_disputes key * Removed duplicate logic in import statements * Replaced queue_participation call with re-prio * Simplifying refactor. Backed were already handled * Removed unneeded spam slots logic * Implementers guide edits * Undid the spam slots refactor * Added comments and implementers guide edit * Added test for participation upon backing * Round of fixes + ran fmt * Round of changes + fmt * Error handling draft * Changed errors to bubble up from reprioritization * Starting to construct new test * Clarifying participation function rename * Reprio test draft * Very rough bump to priority queue test draft * Improving logging * Most concise reproduction of error on third import * Add `handle_approval_vote_request` * Removing reprioritization on included event test * Removing unneeded test config * cargo fmt * Test works * Fixing final nits * Tweaks to test Tsveto figured out Co-authored-by: eskimor <eskimor@no-such-url.com> Co-authored-by: Tsvetomir Dimitrov <tsvetomir@parity.io>
This commit is contained in:
@@ -269,8 +269,13 @@ impl Initialized {
|
||||
update: ActiveLeavesUpdate,
|
||||
now: u64,
|
||||
) -> Result<()> {
|
||||
let on_chain_votes =
|
||||
let scraped_updates =
|
||||
self.scraper.process_active_leaves_update(ctx.sender(), &update).await?;
|
||||
log_error(
|
||||
self.participation
|
||||
.bump_to_priority_for_candidates(ctx, &scraped_updates.included_receipts)
|
||||
.await,
|
||||
)?;
|
||||
self.participation.process_active_leaves_update(ctx, &update).await?;
|
||||
|
||||
if let Some(new_leaf) = update.activated {
|
||||
@@ -308,7 +313,7 @@ impl Initialized {
|
||||
|
||||
// The `runtime-api` subsystem has an internal queue which serializes the execution,
|
||||
// so there is no point in running these in parallel.
|
||||
for votes in on_chain_votes {
|
||||
for votes in scraped_updates.on_chain_votes {
|
||||
let _ = self.process_on_chain_votes(ctx, overlay_db, votes, now).await.map_err(
|
||||
|error| {
|
||||
gum::warn!(
|
||||
@@ -416,6 +421,8 @@ impl Initialized {
|
||||
})
|
||||
.collect();
|
||||
|
||||
// Importantly, handling import statements for backing votes also
|
||||
// clears spam slots for any newly backed candidates
|
||||
let import_result = self
|
||||
.handle_import_statements(
|
||||
ctx,
|
||||
@@ -837,8 +844,15 @@ impl Initialized {
|
||||
let new_state = import_result.new_state();
|
||||
|
||||
let is_included = self.scraper.is_candidate_included(&candidate_hash);
|
||||
|
||||
let potential_spam = !is_included && !new_state.is_confirmed() && !new_state.has_own_vote();
|
||||
let is_backed = self.scraper.is_candidate_backed(&candidate_hash);
|
||||
let has_own_vote = new_state.has_own_vote();
|
||||
let is_disputed = new_state.is_disputed();
|
||||
let has_controlled_indices = !env.controlled_indices().is_empty();
|
||||
let is_confirmed = new_state.is_confirmed();
|
||||
let potential_spam =
|
||||
!is_included && !is_backed && !new_state.is_confirmed() && !new_state.has_own_vote();
|
||||
// We participate only in disputes which are included, backed or confirmed
|
||||
let allow_participation = is_included || is_backed || is_confirmed;
|
||||
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
@@ -851,8 +865,11 @@ impl Initialized {
|
||||
"Is spam?"
|
||||
);
|
||||
|
||||
// This check is responsible for all clearing of spam slots. It runs
|
||||
// whenever a vote is imported from on or off chain, and decrements
|
||||
// slots whenever a candidate is newly backed, confirmed, or has our
|
||||
// own vote.
|
||||
if !potential_spam {
|
||||
// Former spammers have not been spammers after all:
|
||||
self.spam_slots.clear(&(session, candidate_hash));
|
||||
|
||||
// Potential spam:
|
||||
@@ -880,14 +897,6 @@ impl Initialized {
|
||||
}
|
||||
}
|
||||
|
||||
let has_own_vote = new_state.has_own_vote();
|
||||
let is_disputed = new_state.is_disputed();
|
||||
let has_controlled_indices = !env.controlled_indices().is_empty();
|
||||
let is_backed = self.scraper.is_candidate_backed(&candidate_hash);
|
||||
let is_confirmed = new_state.is_confirmed();
|
||||
// We participate only in disputes which are included, backed or confirmed
|
||||
let allow_participation = is_included || is_backed || is_confirmed;
|
||||
|
||||
// Participate in dispute if we did not cast a vote before and actually have keys to cast a
|
||||
// local vote. Disputes should fall in one of the categories below, otherwise we will refrain
|
||||
// from participation:
|
||||
|
||||
@@ -51,7 +51,10 @@ pub use queues::{ParticipationPriority, ParticipationRequest, QueueError};
|
||||
/// This should be a relatively low value, while we might have a speedup once we fetched the data,
|
||||
/// due to multi-core architectures, but the fetching itself can not be improved by parallel
|
||||
/// requests. This means that higher numbers make it harder for a single dispute to resolve fast.
|
||||
#[cfg(not(test))]
|
||||
const MAX_PARALLEL_PARTICIPATIONS: usize = 3;
|
||||
#[cfg(test)]
|
||||
pub(crate) const MAX_PARALLEL_PARTICIPATIONS: usize = 1;
|
||||
|
||||
/// Keep track of disputes we need to participate in.
|
||||
///
|
||||
@@ -212,6 +215,19 @@ impl Participation {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Moving any request concerning the given candidates from best-effort to
|
||||
/// priority, ignoring any candidates that don't have any queued participation requests.
|
||||
pub async fn bump_to_priority_for_candidates<Context>(
|
||||
&mut self,
|
||||
ctx: &mut Context,
|
||||
included_receipts: &Vec<CandidateReceipt>,
|
||||
) -> Result<()> {
|
||||
for receipt in included_receipts {
|
||||
self.queue.prioritize_if_present(ctx.sender(), receipt).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Dequeue until `MAX_PARALLEL_PARTICIPATIONS` is reached.
|
||||
async fn dequeue_until_capacity<Context>(
|
||||
&mut self,
|
||||
|
||||
@@ -159,6 +159,31 @@ impl Queues {
|
||||
self.pop_best_effort().map(|d| d.1)
|
||||
}
|
||||
|
||||
/// Reprioritizes any participation requests pertaining to the
|
||||
/// passed candidates from best effort to priority.
|
||||
pub async fn prioritize_if_present(
|
||||
&mut self,
|
||||
sender: &mut impl overseer::DisputeCoordinatorSenderTrait,
|
||||
receipt: &CandidateReceipt,
|
||||
) -> Result<()> {
|
||||
let comparator = CandidateComparator::new(sender, receipt).await?;
|
||||
self.prioritize_with_comparator(comparator)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn prioritize_with_comparator(
|
||||
&mut self,
|
||||
comparator: CandidateComparator,
|
||||
) -> std::result::Result<(), QueueError> {
|
||||
if self.priority.len() >= PRIORITY_QUEUE_SIZE {
|
||||
return Err(QueueError::PriorityFull)
|
||||
}
|
||||
if let Some(request) = self.best_effort.remove(&comparator) {
|
||||
self.priority.insert(comparator, request);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn queue_with_comparator(
|
||||
&mut self,
|
||||
comparator: CandidateComparator,
|
||||
|
||||
@@ -26,7 +26,7 @@ use polkadot_node_subsystem::{
|
||||
};
|
||||
use polkadot_node_subsystem_util::runtime::{get_candidate_events, get_on_chain_votes};
|
||||
use polkadot_primitives::v2::{
|
||||
BlockNumber, CandidateEvent, CandidateHash, Hash, ScrapedOnChainVotes,
|
||||
BlockNumber, CandidateEvent, CandidateHash, CandidateReceipt, Hash, ScrapedOnChainVotes,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
@@ -51,6 +51,24 @@ const LRU_OBSERVED_BLOCKS_CAPACITY: NonZeroUsize = match NonZeroUsize::new(20) {
|
||||
None => panic!("Observed blocks cache size must be non-zero"),
|
||||
};
|
||||
|
||||
/// ScrapedUpdates
|
||||
///
|
||||
/// Updates to on_chain_votes and included receipts for new active leaf and its unprocessed
|
||||
/// ancestors.
|
||||
///
|
||||
/// on_chain_votes: New votes as seen on chain
|
||||
/// included_receipts: Newly included parachain block candidate receipts as seen on chain
|
||||
pub struct ScrapedUpdates {
|
||||
pub on_chain_votes: Vec<ScrapedOnChainVotes>,
|
||||
pub included_receipts: Vec<CandidateReceipt>,
|
||||
}
|
||||
|
||||
impl ScrapedUpdates {
|
||||
pub fn new() -> Self {
|
||||
Self { on_chain_votes: Vec::new(), included_receipts: Vec::new() }
|
||||
}
|
||||
}
|
||||
|
||||
/// Chain scraper
|
||||
///
|
||||
/// Scrapes unfinalized chain in order to collect information from blocks.
|
||||
@@ -104,8 +122,8 @@ impl ChainScraper {
|
||||
};
|
||||
let update =
|
||||
ActiveLeavesUpdate { activated: Some(initial_head), deactivated: Default::default() };
|
||||
let votes = s.process_active_leaves_update(sender, &update).await?;
|
||||
Ok((s, votes))
|
||||
let updates = s.process_active_leaves_update(sender, &update).await?;
|
||||
Ok((s, updates.on_chain_votes))
|
||||
}
|
||||
|
||||
/// Check whether we have seen a candidate included on any chain.
|
||||
@@ -122,18 +140,19 @@ impl ChainScraper {
|
||||
///
|
||||
/// and updates current heads, so we can query candidates for all non finalized blocks.
|
||||
///
|
||||
/// Returns: On chain vote for the leaf and any ancestors we might not yet have seen.
|
||||
/// Returns: On chain votes and included candidate receipts for the leaf and any
|
||||
/// ancestors we might not yet have seen.
|
||||
pub async fn process_active_leaves_update<Sender>(
|
||||
&mut self,
|
||||
sender: &mut Sender,
|
||||
update: &ActiveLeavesUpdate,
|
||||
) -> Result<Vec<ScrapedOnChainVotes>>
|
||||
) -> Result<ScrapedUpdates>
|
||||
where
|
||||
Sender: overseer::DisputeCoordinatorSenderTrait,
|
||||
{
|
||||
let activated = match update.activated.as_ref() {
|
||||
Some(activated) => activated,
|
||||
None => return Ok(Vec::new()),
|
||||
None => return Ok(ScrapedUpdates::new()),
|
||||
};
|
||||
|
||||
// Fetch ancestry up to last finalized block.
|
||||
@@ -147,20 +166,22 @@ impl ChainScraper {
|
||||
|
||||
let block_hashes = std::iter::once(activated.hash).chain(ancestors);
|
||||
|
||||
let mut on_chain_votes = Vec::new();
|
||||
let mut scraped_updates = ScrapedUpdates::new();
|
||||
for (block_number, block_hash) in block_numbers.zip(block_hashes) {
|
||||
gum::trace!(?block_number, ?block_hash, "In ancestor processing.");
|
||||
|
||||
self.process_candidate_events(sender, block_number, block_hash).await?;
|
||||
let receipts_for_block =
|
||||
self.process_candidate_events(sender, block_number, block_hash).await?;
|
||||
scraped_updates.included_receipts.extend(receipts_for_block);
|
||||
|
||||
if let Some(votes) = get_on_chain_votes(sender, block_hash).await? {
|
||||
on_chain_votes.push(votes);
|
||||
scraped_updates.on_chain_votes.push(votes);
|
||||
}
|
||||
}
|
||||
|
||||
self.last_observed_blocks.put(activated.hash, ());
|
||||
|
||||
Ok(on_chain_votes)
|
||||
Ok(scraped_updates)
|
||||
}
|
||||
|
||||
/// Prune finalized candidates.
|
||||
@@ -187,17 +208,21 @@ impl ChainScraper {
|
||||
/// Process candidate events of a block.
|
||||
///
|
||||
/// Keep track of all included and backed candidates.
|
||||
///
|
||||
/// Returns freshly included candidate receipts
|
||||
async fn process_candidate_events<Sender>(
|
||||
&mut self,
|
||||
sender: &mut Sender,
|
||||
block_number: BlockNumber,
|
||||
block_hash: Hash,
|
||||
) -> Result<()>
|
||||
) -> Result<Vec<CandidateReceipt>>
|
||||
where
|
||||
Sender: overseer::DisputeCoordinatorSenderTrait,
|
||||
{
|
||||
let events = get_candidate_events(sender, block_hash).await?;
|
||||
let mut included_receipts: Vec<CandidateReceipt> = Vec::new();
|
||||
// Get included and backed events:
|
||||
for ev in get_candidate_events(sender, block_hash).await? {
|
||||
for ev in events {
|
||||
match ev {
|
||||
CandidateEvent::CandidateIncluded(receipt, _, _, _) => {
|
||||
let candidate_hash = receipt.hash();
|
||||
@@ -208,6 +233,7 @@ impl ChainScraper {
|
||||
"Processing included event"
|
||||
);
|
||||
self.included_candidates.insert(block_number, candidate_hash);
|
||||
included_receipts.push(receipt);
|
||||
},
|
||||
CandidateEvent::CandidateBacked(receipt, _, _, _) => {
|
||||
let candidate_hash = receipt.hash();
|
||||
@@ -224,7 +250,7 @@ impl ChainScraper {
|
||||
},
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
Ok(included_receipts)
|
||||
}
|
||||
|
||||
/// Returns ancestors of `head` in the descending order, stopping
|
||||
|
||||
@@ -169,6 +169,7 @@ struct TestState {
|
||||
config: Config,
|
||||
clock: MockClock,
|
||||
headers: HashMap<Hash, Header>,
|
||||
block_num_to_header: HashMap<BlockNumber, Hash>,
|
||||
last_block: Hash,
|
||||
// last session the subsystem knows about.
|
||||
known_session: Option<SessionIndex>,
|
||||
@@ -225,6 +226,8 @@ impl Default for TestState {
|
||||
|
||||
let mut headers = HashMap::new();
|
||||
let _ = headers.insert(last_block, genesis_header.clone());
|
||||
let mut block_num_to_header = HashMap::new();
|
||||
let _ = block_num_to_header.insert(genesis_header.number, last_block);
|
||||
|
||||
TestState {
|
||||
validators: validators.into_iter().map(|(pair, _)| pair).collect(),
|
||||
@@ -236,6 +239,7 @@ impl Default for TestState {
|
||||
config,
|
||||
clock: MockClock::default(),
|
||||
headers,
|
||||
block_num_to_header,
|
||||
last_block,
|
||||
known_session: None,
|
||||
}
|
||||
@@ -262,6 +266,7 @@ impl TestState {
|
||||
let block_hash = block_header.hash();
|
||||
|
||||
let _ = self.headers.insert(block_hash, block_header.clone());
|
||||
let _ = self.block_num_to_header.insert(block_header.number, block_hash);
|
||||
self.last_block = block_hash;
|
||||
|
||||
gum::debug!(?block_number, "Activating block in activate_leaf_at_session.");
|
||||
@@ -390,6 +395,27 @@ impl TestState {
|
||||
);
|
||||
finished_steps.got_scraping_information = true;
|
||||
tx.send(Ok(0)).unwrap();
|
||||
|
||||
// If the activated block number is > 1 the scraper will ask for block ancestors. Handle this case.
|
||||
if block_number > 1 {
|
||||
assert_matches!(
|
||||
overseer_recv(virtual_overseer).await,
|
||||
AllMessages::ChainApi(ChainApiMessage::Ancestors{
|
||||
hash,
|
||||
k,
|
||||
response_channel,
|
||||
}) => {
|
||||
assert_eq!(hash, block_hash); // A bit restrictive, remove if it causes problems.
|
||||
let target_header = self.headers.get(&hash).expect("The function is called for this block so it should exist");
|
||||
let mut response = Vec::new();
|
||||
for i in target_header.number.saturating_sub(k as u32)..target_header.number {
|
||||
response.push(self.block_num_to_header.get(&i).expect("headers and block_num_to_header should always be in sync").clone());
|
||||
}
|
||||
let _ = response_channel.send(Ok(response));
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
assert_matches!(
|
||||
overseer_recv(virtual_overseer).await,
|
||||
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
@@ -580,7 +606,34 @@ fn test_harness<F>(test: F) -> TestState
|
||||
where
|
||||
F: FnOnce(TestState, VirtualOverseer) -> BoxFuture<'static, TestState>,
|
||||
{
|
||||
TestState::default().resume(test)
|
||||
let mut test_state = TestState::default();
|
||||
|
||||
// Add two more blocks after the genesis (which is created in `default()`)
|
||||
let h1 = Header {
|
||||
parent_hash: test_state.last_block.clone(),
|
||||
number: 1,
|
||||
digest: dummy_digest(),
|
||||
state_root: dummy_hash(),
|
||||
extrinsics_root: dummy_hash(),
|
||||
};
|
||||
let h1_hash = h1.hash();
|
||||
test_state.headers.insert(h1_hash.clone(), h1);
|
||||
test_state.block_num_to_header.insert(1, h1_hash.clone());
|
||||
test_state.last_block = h1_hash;
|
||||
|
||||
let h2 = Header {
|
||||
parent_hash: test_state.last_block.clone(),
|
||||
number: 2,
|
||||
digest: dummy_digest(),
|
||||
state_root: dummy_hash(),
|
||||
extrinsics_root: dummy_hash(),
|
||||
};
|
||||
let h2_hash = h2.hash();
|
||||
test_state.headers.insert(h2_hash.clone(), h2);
|
||||
test_state.block_num_to_header.insert(2, h2_hash.clone());
|
||||
test_state.last_block = h2_hash;
|
||||
|
||||
test_state.resume(test)
|
||||
}
|
||||
|
||||
/// Handle participation messages.
|
||||
@@ -648,6 +701,18 @@ pub async fn handle_approval_vote_request(
|
||||
);
|
||||
}
|
||||
|
||||
/// Handle block number request. In the context of these tests this message is required for
|
||||
/// handling comparator creation for enqueuing participations.
|
||||
async fn handle_get_block_number(ctx_handle: &mut VirtualOverseer, test_state: &TestState) {
|
||||
assert_matches!(
|
||||
ctx_handle.recv().await,
|
||||
AllMessages::ChainApi(
|
||||
ChainApiMessage::BlockNumber(hash, tx)) => {
|
||||
tx.send(Ok(test_state.headers.get(&hash).map(|r| r.number))).unwrap();
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn too_many_unconfirmed_statements_are_considered_spam() {
|
||||
test_harness(|mut test_state, mut virtual_overseer| {
|
||||
@@ -1273,7 +1338,7 @@ fn backing_statements_import_works_and_no_spam() {
|
||||
})
|
||||
.await;
|
||||
|
||||
// Result should be valid, because our node participated, so spam slots are cleared:
|
||||
// Import should be valid, as spam slots were not filled
|
||||
assert_matches!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport));
|
||||
|
||||
virtual_overseer.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await;
|
||||
@@ -3013,7 +3078,7 @@ fn participation_for_included_candidates() {
|
||||
|
||||
assert_eq!(rx.await.unwrap().len(), 1);
|
||||
|
||||
// check if we have participated (casted a vote)
|
||||
// check if we have participated (cast a vote)
|
||||
let (tx, rx) = oneshot::channel();
|
||||
virtual_overseer
|
||||
.send(FromOrchestra::Communication {
|
||||
@@ -3035,3 +3100,252 @@ fn participation_for_included_candidates() {
|
||||
})
|
||||
});
|
||||
}
|
||||
|
||||
/// Shows that importing backing votes when a backing event is being processed
|
||||
/// results in participation.
|
||||
#[test]
|
||||
fn local_participation_in_dispute_for_backed_candidate() {
|
||||
test_harness(|mut test_state, mut virtual_overseer| {
|
||||
Box::pin(async move {
|
||||
let session = 1;
|
||||
|
||||
test_state.handle_resume_sync(&mut virtual_overseer, session).await;
|
||||
|
||||
let candidate_receipt = make_valid_candidate_receipt();
|
||||
let candidate_hash = candidate_receipt.hash();
|
||||
|
||||
// Step 1: Show that we don't participate when not backed, confirmed, or included
|
||||
|
||||
// activate leaf - without candidate backed event
|
||||
test_state
|
||||
.activate_leaf_at_session(&mut virtual_overseer, session, 1, vec![])
|
||||
.await;
|
||||
|
||||
// generate two votes
|
||||
let (valid_vote, invalid_vote) = generate_opposing_votes_pair(
|
||||
&test_state,
|
||||
ValidatorIndex(1),
|
||||
ValidatorIndex(2),
|
||||
candidate_hash,
|
||||
session,
|
||||
VoteType::Explicit,
|
||||
)
|
||||
.await;
|
||||
|
||||
virtual_overseer
|
||||
.send(FromOrchestra::Communication {
|
||||
msg: DisputeCoordinatorMessage::ImportStatements {
|
||||
candidate_receipt: candidate_receipt.clone(),
|
||||
session,
|
||||
statements: vec![
|
||||
(valid_vote, ValidatorIndex(1)),
|
||||
(invalid_vote, ValidatorIndex(2)),
|
||||
],
|
||||
pending_confirmation: None,
|
||||
},
|
||||
})
|
||||
.await;
|
||||
|
||||
handle_approval_vote_request(&mut virtual_overseer, &candidate_hash, HashMap::new())
|
||||
.await;
|
||||
|
||||
assert_matches!(virtual_overseer.recv().timeout(TEST_TIMEOUT).await, None);
|
||||
|
||||
// Step 2: Show that once backing votes are processed we participate
|
||||
|
||||
// Activate leaf: With candidate backed event
|
||||
test_state
|
||||
.activate_leaf_at_session(
|
||||
&mut virtual_overseer,
|
||||
session,
|
||||
1,
|
||||
vec![make_candidate_backed_event(candidate_receipt.clone())],
|
||||
)
|
||||
.await;
|
||||
|
||||
let backing_valid = test_state
|
||||
.issue_backing_statement_with_index(ValidatorIndex(3), candidate_hash, session)
|
||||
.await;
|
||||
|
||||
virtual_overseer
|
||||
.send(FromOrchestra::Communication {
|
||||
msg: DisputeCoordinatorMessage::ImportStatements {
|
||||
candidate_receipt: candidate_receipt.clone(),
|
||||
session,
|
||||
statements: vec![(backing_valid, ValidatorIndex(3))],
|
||||
pending_confirmation: None,
|
||||
},
|
||||
})
|
||||
.await;
|
||||
|
||||
participation_with_distribution(
|
||||
&mut virtual_overseer,
|
||||
&candidate_hash,
|
||||
candidate_receipt.commitments_hash,
|
||||
)
|
||||
.await;
|
||||
|
||||
// Check for our 1 active dispute
|
||||
let (tx, rx) = oneshot::channel();
|
||||
virtual_overseer
|
||||
.send(FromOrchestra::Communication {
|
||||
msg: DisputeCoordinatorMessage::ActiveDisputes(tx),
|
||||
})
|
||||
.await;
|
||||
|
||||
assert_eq!(rx.await.unwrap().len(), 1);
|
||||
|
||||
// check if we have participated (casted a vote)
|
||||
let (tx, rx) = oneshot::channel();
|
||||
virtual_overseer
|
||||
.send(FromOrchestra::Communication {
|
||||
msg: DisputeCoordinatorMessage::QueryCandidateVotes(
|
||||
vec![(session, candidate_hash)],
|
||||
tx,
|
||||
),
|
||||
})
|
||||
.await;
|
||||
|
||||
let (_, _, votes) = rx.await.unwrap().get(0).unwrap().clone();
|
||||
assert_eq!(votes.valid.raw().len(), 3); // 3 => 1 initial vote, 1 backing vote, and our vote
|
||||
assert_eq!(votes.invalid.len(), 1);
|
||||
|
||||
// Wrap up
|
||||
virtual_overseer.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await;
|
||||
|
||||
test_state
|
||||
})
|
||||
});
|
||||
}
|
||||
|
||||
/// Shows that when a candidate_included event is scraped from the chain we
|
||||
/// reprioritize any participation requests pertaining to that candidate.
|
||||
/// This involves moving the request for this candidate from the best effort
|
||||
/// queue to the priority queue.
|
||||
#[test]
|
||||
fn participation_requests_reprioritized_for_newly_included() {
|
||||
test_harness(|mut test_state, mut virtual_overseer| {
|
||||
Box::pin(async move {
|
||||
let session = 1;
|
||||
test_state.handle_resume_sync(&mut virtual_overseer, session).await;
|
||||
let mut receipts: Vec<CandidateReceipt> = Vec::new();
|
||||
|
||||
// Generate all receipts
|
||||
for repetition in 1..=3u8 {
|
||||
// Building candidate receipts
|
||||
let mut candidate_receipt = make_valid_candidate_receipt();
|
||||
candidate_receipt.descriptor.pov_hash = Hash::from(
|
||||
[repetition; 32], // Altering this receipt so its hash will be changed
|
||||
);
|
||||
// Set consecutive parents (starting from zero). They will order the candidates for participation.
|
||||
let parent_block_num: BlockNumber = repetition as BlockNumber - 1;
|
||||
candidate_receipt.descriptor.relay_parent =
|
||||
test_state.block_num_to_header.get(&parent_block_num).unwrap().clone();
|
||||
receipts.push(candidate_receipt.clone());
|
||||
}
|
||||
|
||||
// Mark all candidates as backed, so their participation requests make it to best effort.
|
||||
// These calls must all occur before including the candidates due to test overseer
|
||||
// oddities.
|
||||
let mut candidate_events = Vec::new();
|
||||
for r in receipts.iter() {
|
||||
candidate_events.push(make_candidate_backed_event(r.clone()))
|
||||
}
|
||||
test_state
|
||||
.activate_leaf_at_session(&mut virtual_overseer, session, 1, candidate_events)
|
||||
.await;
|
||||
|
||||
for (idx, candidate_receipt) in receipts.iter().enumerate() {
|
||||
let candidate_hash = candidate_receipt.hash();
|
||||
|
||||
// Create votes for candidates
|
||||
let (valid_vote, invalid_vote) = generate_opposing_votes_pair(
|
||||
&test_state,
|
||||
ValidatorIndex(1),
|
||||
ValidatorIndex(2),
|
||||
candidate_hash,
|
||||
session,
|
||||
VoteType::Explicit,
|
||||
)
|
||||
.await;
|
||||
|
||||
// Import votes for candidates
|
||||
virtual_overseer
|
||||
.send(FromOrchestra::Communication {
|
||||
msg: DisputeCoordinatorMessage::ImportStatements {
|
||||
candidate_receipt: candidate_receipt.clone(),
|
||||
session,
|
||||
statements: vec![
|
||||
(valid_vote, ValidatorIndex(1)),
|
||||
(invalid_vote, ValidatorIndex(2)),
|
||||
],
|
||||
pending_confirmation: None,
|
||||
},
|
||||
})
|
||||
.await;
|
||||
|
||||
// Handle corresponding messages to unblock import
|
||||
// we need to handle `ApprovalVotingMessage::GetApprovalSignaturesForCandidate` for import
|
||||
handle_approval_vote_request(
|
||||
&mut virtual_overseer,
|
||||
&candidate_hash,
|
||||
HashMap::new(),
|
||||
)
|
||||
.await;
|
||||
|
||||
// We'll trigger participation for the first `MAX_PARALLEL_PARTICIPATIONS` candidates.
|
||||
// The rest will be queued => we need to handle `ChainApiMessage::BlockNumber` for them.
|
||||
if idx >= crate::participation::MAX_PARALLEL_PARTICIPATIONS {
|
||||
// We send the `idx` as parent block number, because it is used for ordering.
|
||||
// This way we get predictable ordering and participation.
|
||||
handle_get_block_number(&mut virtual_overseer, &test_state).await;
|
||||
}
|
||||
}
|
||||
|
||||
// Generate included event for one of the candidates here
|
||||
test_state
|
||||
.activate_leaf_at_session(
|
||||
&mut virtual_overseer,
|
||||
session,
|
||||
2,
|
||||
vec![make_candidate_included_event(
|
||||
receipts.last().expect("There is more than one candidate").clone(),
|
||||
)],
|
||||
)
|
||||
.await;
|
||||
|
||||
// NB: The checks below are a bit racy. In theory candidate 2 can be processed even before candidate 0 and this is okay. If any
|
||||
// of the asserts in the two functions after this comment fail -> rework `participation_with_distribution` to expect a set of
|
||||
// commitment hashes instead of just one.
|
||||
|
||||
// This is the candidate for which participation was started initially (`MAX_PARALLEL_PARTICIPATIONS` threshold was not yet hit)
|
||||
participation_with_distribution(
|
||||
&mut virtual_overseer,
|
||||
&receipts.get(0).expect("There is more than one candidate").hash(),
|
||||
receipts.first().expect("There is more than one candidate").commitments_hash,
|
||||
)
|
||||
.await;
|
||||
|
||||
// This one should have been prioritized
|
||||
participation_with_distribution(
|
||||
&mut virtual_overseer,
|
||||
&receipts.get(2).expect("There is more than one candidate").hash(),
|
||||
receipts.last().expect("There is more than one candidate").commitments_hash,
|
||||
)
|
||||
.await;
|
||||
|
||||
// And this is the last one
|
||||
participation_with_distribution(
|
||||
&mut virtual_overseer,
|
||||
&receipts.get(1).expect("There is more than one candidate").hash(),
|
||||
receipts.first().expect("There is more than one candidate").commitments_hash,
|
||||
)
|
||||
.await;
|
||||
|
||||
// Wrap up
|
||||
virtual_overseer.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await;
|
||||
|
||||
test_state
|
||||
})
|
||||
});
|
||||
}
|
||||
|
||||
@@ -398,11 +398,22 @@ and only if all of the following conditions are satisfied:
|
||||
* the dispute is not confirmed
|
||||
* we haven't cast a vote for the dispute
|
||||
|
||||
Whenever any vote on a dispute is imported these conditions are checked. If the
|
||||
dispute is found not to be potential spam, then spam slots for the disputed candidate hash are cleared. This decrements the spam count for every validator
|
||||
which had voted invalid.
|
||||
|
||||
To keep spam slots from filling up unnecessarily we want to clear spam slots
|
||||
whenever a candidate is seen to be backed or included. Fortunately this behavior
|
||||
is acheived by clearing slots on vote import as described above. Because on chain
|
||||
backing votes are processed when a block backing the disputed candidate is discovered, spam slots are cleared for every backed candidate. Included
|
||||
candidates have also been seen as backed on the same fork, so decrementing spam
|
||||
slots is handled in that case as well.
|
||||
|
||||
The reason this works is because we only need to worry about actual dispute
|
||||
votes. Import of backing votes are already rate limited and concern only real
|
||||
candidates for approval votes a similar argument holds (if they come from
|
||||
candidates. For approval votes a similar argument holds (if they come from
|
||||
approval-voting), but we also don't import them until a dispute already
|
||||
concluded. For actual dispute votes, we need two opposing votes, so there must be
|
||||
concluded. For actual dispute votes we need two opposing votes, so there must be
|
||||
an explicit `invalid` vote in the import. Only a third of the validators can be
|
||||
malicious, so spam disk usage is limited to `2*vote_size*n/3*NUM_SPAM_SLOTS`, with
|
||||
`n` being the number of validators.
|
||||
@@ -516,16 +527,14 @@ We only ever care about disputes for candidates that have been included on at
|
||||
least some chain (became available). This is because the availability system was
|
||||
designed for precisely that: Only with inclusion (availability) we have
|
||||
guarantees about the candidate to actually be available. Because only then we
|
||||
have guarantees that malicious backers can be reliably checked and slashed. The
|
||||
system was also designed for non included candidates to not pose any threat to
|
||||
the system.
|
||||
have guarantees that malicious backers can be reliably checked and slashed. Also, by design non included candidates do not pose any threat to the system.
|
||||
|
||||
One could think of an (additional) dispute system to make it possible to dispute
|
||||
any candidate that has been proposed by a validator, no matter whether it got
|
||||
successfully included or even backed. Unfortunately, it would be very brittle
|
||||
(no availability) and also spam protection would be way harder than for the
|
||||
disputes handled by the dispute-coordinator. In fact all described spam handling
|
||||
strategies above would simply be not available.
|
||||
disputes handled by the dispute-coordinator. In fact, all the spam handling
|
||||
strategies described above would simply be unavailable.
|
||||
|
||||
It is worth thinking about who could actually raise such disputes anyway:
|
||||
Approval checkers certainly not, as they will only ever check once availability
|
||||
|
||||
Reference in New Issue
Block a user