Issue 4804: Notify chain selection of concluded disputes directly (#6512)

* Setting up new ChainSelectionMessage

* Partial first pass

* Got dispute conclusion data to provisioner

* Finished first draft for 4804 code

* A bit of polish and code comments

* cargo fmt

* Implementers guide and code comments

* More formatting, and naming issues

* Wrote test for ChainSelection side of change

* Added dispute coordinator side test

* FMT

* Addressing Marcin's comments

* fmt

* Addressing further Marcin comment

* Removing unnecessary test line

* Rough draft addressing Robert changes

* Clean up and test modification

* Majorly refactored scraper change

* Minor fixes for ChainSelection

* Polish and fmt

* Condensing inclusions per candidate logic

* Addressing Tsveto's comments

* Addressing Robert's Comments

* Altered inclusions struct to use nested BTreeMaps

* Naming fix

* Fixing inclusions struct comments

* Update node/core/dispute-coordinator/src/scraping/mod.rs

Add comment to split_off() use

Co-authored-by: Marcin S. <marcin@bytedude.com>

* Optimizing removal at block height for inclusions

* fmt

* Using copy trait

Co-authored-by: Marcin S. <marcin@bytedude.com>
This commit is contained in:
Bradley Olson
2023-01-18 18:06:34 -08:00
committed by GitHub
parent de7378efe7
commit 90aa798b76
12 changed files with 537 additions and 46 deletions
@@ -466,6 +466,10 @@ where
let _ = tx.send(best_containing);
}
ChainSelectionMessage::RevertBlocks(blocks_to_revert) => {
let write_ops = handle_revert_blocks(backend, blocks_to_revert)?;
backend.write(write_ops)?;
}
}
}
}
@@ -678,6 +682,21 @@ fn handle_approved_block(backend: &mut impl Backend, approved_block: Hash) -> Re
backend.write(ops)
}
// Here we revert a provided group of blocks. The most common cause for this is that
// the dispute coordinator has notified chain selection of a dispute which concluded
// against a candidate.
fn handle_revert_blocks(
backend: &impl Backend,
blocks_to_revert: Vec<(BlockNumber, Hash)>,
) -> Result<Vec<BackendWriteOp>, Error> {
let mut overlay = OverlayedBackend::new(backend);
for (block_number, block_hash) in blocks_to_revert {
tree::apply_single_reversion(&mut overlay, block_hash, block_number)?;
}
Ok(overlay.into_write_ops().collect())
}
fn detect_stagnant(
backend: &mut impl Backend,
now: Timestamp,
@@ -2014,3 +2014,106 @@ fn stagnant_makes_childless_parent_leaf() {
virtual_overseer
})
}
#[test]
fn revert_blocks_message_triggers_proper_reversion() {
test_harness(|backend, _, mut virtual_overseer| async move {
// Building mini chain with 1 finalized block and 3 unfinalized blocks
let finalized_number = 0;
let finalized_hash = Hash::repeat_byte(0);
let (head_hash, built_chain) =
construct_chain_on_base(vec![1, 2, 3], finalized_number, finalized_hash, |_| {});
import_blocks_into(
&mut virtual_overseer,
&backend,
Some((finalized_number, finalized_hash)),
built_chain.clone(),
)
.await;
// Checking mini chain
assert_backend_contains(&backend, built_chain.iter().map(|&(ref h, _)| h));
assert_leaves(&backend, vec![head_hash]);
assert_leaves_query(&mut virtual_overseer, vec![head_hash]).await;
let block_1_hash = backend.load_blocks_by_number(1).unwrap().get(0).unwrap().clone();
let block_2_hash = backend.load_blocks_by_number(2).unwrap().get(0).unwrap().clone();
// Sending revert blocks message
let (_, write_rx) = backend.await_next_write();
virtual_overseer
.send(FromOrchestra::Communication {
msg: ChainSelectionMessage::RevertBlocks(Vec::from([(2, block_2_hash)])),
})
.await;
write_rx.await.unwrap();
// Checking results:
// Block 2 should be explicitly reverted
assert_eq!(
backend
.load_block_entry(&block_2_hash)
.unwrap()
.unwrap()
.viability
.explicitly_reverted,
true
);
// Block 3 should be non-viable, with 2 as its earliest unviable ancestor
assert_eq!(
backend
.load_block_entry(&head_hash)
.unwrap()
.unwrap()
.viability
.earliest_unviable_ancestor,
Some(block_2_hash)
);
// Block 1 should be left as the only leaf
assert_leaves(&backend, vec![block_1_hash]);
virtual_overseer
})
}
#[test]
fn revert_blocks_against_finalized_is_ignored() {
test_harness(|backend, _, mut virtual_overseer| async move {
// Building mini chain with 1 finalized block and 3 unfinalized blocks
let finalized_number = 0;
let finalized_hash = Hash::repeat_byte(0);
let (head_hash, built_chain) =
construct_chain_on_base(vec![1], finalized_number, finalized_hash, |_| {});
import_blocks_into(
&mut virtual_overseer,
&backend,
Some((finalized_number, finalized_hash)),
built_chain.clone(),
)
.await;
// Checking mini chain
assert_backend_contains(&backend, built_chain.iter().map(|&(ref h, _)| h));
// Sending dispute concluded against message
virtual_overseer
.send(FromOrchestra::Communication {
msg: ChainSelectionMessage::RevertBlocks(Vec::from([(
finalized_number,
finalized_hash,
)])),
})
.await;
// Leaf should be head if reversion of finalized was properly ignored
assert_leaves(&backend, vec![head_hash]);
assert_leaves_query(&mut virtual_overseer, vec![head_hash]).await;
virtual_overseer
})
}
+72 -35
View File
@@ -247,7 +247,7 @@ pub(crate) fn import_block(
stagnant_at: Timestamp,
) -> Result<(), Error> {
add_block(backend, block_hash, block_number, parent_hash, weight, stagnant_at)?;
apply_reversions(backend, block_hash, block_number, reversion_logs)?;
apply_ancestor_reversions(backend, block_hash, block_number, reversion_logs)?;
Ok(())
}
@@ -347,9 +347,9 @@ fn add_block(
Ok(())
}
// Assuming that a block is already imported, accepts the number of the block
// as well as a list of reversions triggered by the block in ascending order.
fn apply_reversions(
/// Assuming that a block is already imported, accepts the number of the block
/// as well as a list of reversions triggered by the block in ascending order.
fn apply_ancestor_reversions(
backend: &mut OverlayedBackend<impl Backend>,
block_hash: Hash,
block_number: BlockNumber,
@@ -358,42 +358,79 @@ fn apply_reversions(
// Note: since revert numbers are in ascending order, the expensive propagation
// of unviability is only heavy on the first log.
for revert_number in reversions {
let mut ancestor_entry =
match load_ancestor(backend, block_hash, block_number, revert_number)? {
None => {
gum::warn!(
target: LOG_TARGET,
?block_hash,
block_number,
revert_target = revert_number,
"The hammer has dropped. \
A block has indicated that its finalized ancestor be reverted. \
Please inform an adult.",
);
continue
},
Some(ancestor_entry) => {
gum::info!(
target: LOG_TARGET,
?block_hash,
block_number,
revert_target = revert_number,
revert_hash = ?ancestor_entry.block_hash,
"A block has signaled that its ancestor be reverted due to a bad parachain block.",
);
ancestor_entry
},
};
ancestor_entry.viability.explicitly_reverted = true;
propagate_viability_update(backend, ancestor_entry)?;
let maybe_block_entry = load_ancestor(backend, block_hash, block_number, revert_number)?;
revert_single_block_entry_if_present(
backend,
maybe_block_entry,
None,
revert_number,
Some(block_hash),
Some(block_number),
)?;
}
Ok(())
}
/// Marks a single block as explicitly reverted, then propagates viability updates
/// to all its children. This is triggered when the disputes subsystem signals that
/// a dispute has concluded against a candidate.
pub(crate) fn apply_single_reversion(
backend: &mut OverlayedBackend<impl Backend>,
revert_hash: Hash,
revert_number: BlockNumber,
) -> Result<(), Error> {
let maybe_block_entry = backend.load_block_entry(&revert_hash)?;
revert_single_block_entry_if_present(
backend,
maybe_block_entry,
Some(revert_hash),
revert_number,
None,
None,
)?;
Ok(())
}
fn revert_single_block_entry_if_present(
backend: &mut OverlayedBackend<impl Backend>,
maybe_block_entry: Option<BlockEntry>,
maybe_revert_hash: Option<Hash>,
revert_number: BlockNumber,
maybe_reporting_hash: Option<Hash>,
maybe_reporting_number: Option<BlockNumber>,
) -> Result<(), Error> {
match maybe_block_entry {
None => {
gum::warn!(
target: LOG_TARGET,
?maybe_revert_hash,
revert_target = revert_number,
?maybe_reporting_hash,
?maybe_reporting_number,
"The hammer has dropped. \
The protocol has indicated that a finalized block be reverted. \
Please inform an adult.",
);
},
Some(mut block_entry) => {
gum::info!(
target: LOG_TARGET,
?maybe_revert_hash,
revert_target = revert_number,
?maybe_reporting_hash,
?maybe_reporting_number,
"Unfinalized block reverted due to a bad parachain block.",
);
block_entry.viability.explicitly_reverted = true;
// Marks children of reverted block as non-viable
propagate_viability_update(backend, block_entry)?;
},
}
Ok(())
}
/// Finalize a block with the given number and hash.
///
/// This will prune all sub-trees not descending from the given block,
@@ -30,7 +30,7 @@ use polkadot_node_primitives::{
};
use polkadot_node_subsystem::{
messages::{
ApprovalVotingMessage, BlockDescription, DisputeCoordinatorMessage,
ApprovalVotingMessage, BlockDescription, ChainSelectionMessage, DisputeCoordinatorMessage,
DisputeDistributionMessage, ImportStatementsResult,
},
overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, OverseerSignal,
@@ -1023,6 +1023,22 @@ impl Initialized {
}
}
// Notify ChainSelection if a dispute has concluded against a candidate. ChainSelection
// will need to mark the candidate's relay parent as reverted.
if import_result.is_freshly_concluded_against() {
let blocks_including = self.scraper.get_blocks_including_candidate(&candidate_hash);
if blocks_including.len() > 0 {
ctx.send_message(ChainSelectionMessage::RevertBlocks(blocks_including)).await;
} else {
gum::debug!(
target: LOG_TARGET,
?candidate_hash,
?session,
"Could not find an including block for candidate against which a dispute has concluded."
);
}
}
// Update metrics:
if import_result.is_freshly_disputed() {
self.metrics.on_open();
@@ -102,15 +102,18 @@ impl ScrapedCandidates {
}
// Removes all candidates up to a given height. The candidates at the block height are NOT removed.
pub fn remove_up_to_height(&mut self, height: &BlockNumber) {
pub fn remove_up_to_height(&mut self, height: &BlockNumber) -> HashSet<CandidateHash> {
let mut candidates_modified: HashSet<CandidateHash> = HashSet::new();
let not_stale = self.candidates_by_block_number.split_off(&height);
let stale = std::mem::take(&mut self.candidates_by_block_number);
self.candidates_by_block_number = not_stale;
for candidates in stale.values() {
for c in candidates {
self.candidates.remove(c);
candidates_modified.insert(*c);
}
}
candidates_modified
}
pub fn insert(&mut self, block_number: BlockNumber, candidate_hash: CandidateHash) {
@@ -14,7 +14,10 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use std::num::NonZeroUsize;
use std::{
collections::{BTreeMap, HashSet},
num::NonZeroUsize,
};
use futures::channel::oneshot;
use lru::LruCache;
@@ -69,9 +72,73 @@ impl ScrapedUpdates {
}
}
/// A structure meant to facilitate chain reversions in the event of a dispute
/// concluding against a candidate. Each candidate hash maps to a number of
/// block heights, which in turn map to vectors of blocks at those heights.
pub struct Inclusions {
inclusions_inner: BTreeMap<CandidateHash, BTreeMap<BlockNumber, Vec<Hash>>>,
}
impl Inclusions {
pub fn new() -> Self {
Self { inclusions_inner: BTreeMap::new() }
}
// Add parent block to the vector which has CandidateHash as an outer key and
// BlockNumber as an inner key
pub fn insert(
&mut self,
candidate_hash: CandidateHash,
block_number: BlockNumber,
block_hash: Hash,
) {
if let Some(blocks_including) = self.inclusions_inner.get_mut(&candidate_hash) {
if let Some(blocks_at_height) = blocks_including.get_mut(&block_number) {
blocks_at_height.push(block_hash);
} else {
blocks_including.insert(block_number, Vec::from([block_hash]));
}
} else {
let mut blocks_including: BTreeMap<BlockNumber, Vec<Hash>> = BTreeMap::new();
blocks_including.insert(block_number, Vec::from([block_hash]));
self.inclusions_inner.insert(candidate_hash, blocks_including);
}
}
pub fn remove_up_to_height(
&mut self,
height: &BlockNumber,
candidates_modified: HashSet<CandidateHash>,
) {
for candidate in candidates_modified {
if let Some(blocks_including) = self.inclusions_inner.get_mut(&candidate) {
// Returns everything after the given key, including the key. This works because the blocks are sorted in ascending order.
*blocks_including = blocks_including.split_off(height);
}
}
self.inclusions_inner
.retain(|_, blocks_including| blocks_including.keys().len() > 0);
}
pub fn get(&mut self, candidate: &CandidateHash) -> Vec<(BlockNumber, Hash)> {
let mut inclusions_as_vec: Vec<(BlockNumber, Hash)> = Vec::new();
if let Some(blocks_including) = self.inclusions_inner.get(candidate) {
for (height, blocks_at_height) in blocks_including.iter() {
for block in blocks_at_height {
inclusions_as_vec.push((*height, *block));
}
}
}
inclusions_as_vec
}
}
/// Chain scraper
///
/// Scrapes unfinalized chain in order to collect information from blocks.
/// Scrapes unfinalized chain in order to collect information from blocks. Chain scraping
/// during disputes enables critical spam prevention. It does so by updating two important
/// criteria determining whether a vote sent during dispute distribution is potential
/// spam. Namely, whether the candidate being voted on is backed or included.
///
/// Concretely:
///
@@ -95,6 +162,11 @@ pub struct ChainScraper {
/// We assume that ancestors of cached blocks are already processed, i.e. we have saved
/// corresponding included candidates.
last_observed_blocks: LruCache<Hash, ()>,
/// Maps included candidate hashes to one or more relay block heights and hashes.
/// These correspond to all the relay blocks which marked a candidate as included,
/// and are needed to apply reversions in case a dispute is concluded against the
/// candidate.
inclusions: Inclusions,
}
impl ChainScraper {
@@ -119,6 +191,7 @@ impl ChainScraper {
included_candidates: candidates::ScrapedCandidates::new(),
backed_candidates: candidates::ScrapedCandidates::new(),
last_observed_blocks: LruCache::new(LRU_OBSERVED_BLOCKS_CAPACITY),
inclusions: Inclusions::new(),
};
let update =
ActiveLeavesUpdate { activated: Some(initial_head), deactivated: Default::default() };
@@ -195,7 +268,9 @@ impl ChainScraper {
{
Some(key_to_prune) => {
self.backed_candidates.remove_up_to_height(&key_to_prune);
self.included_candidates.remove_up_to_height(&key_to_prune);
let candidates_modified =
self.included_candidates.remove_up_to_height(&key_to_prune);
self.inclusions.remove_up_to_height(&key_to_prune, candidates_modified);
},
None => {
// Nothing to prune. We are still in the beginning of the chain and there are not
@@ -233,6 +308,7 @@ impl ChainScraper {
"Processing included event"
);
self.included_candidates.insert(block_number, candidate_hash);
self.inclusions.insert(candidate_hash, block_number, block_hash);
included_receipts.push(receipt);
},
CandidateEvent::CandidateBacked(receipt, _, _, _) => {
@@ -318,6 +394,13 @@ impl ChainScraper {
}
return Ok(ancestors)
}
pub fn get_blocks_including_candidate(
&mut self,
candidate: &CandidateHash,
) -> Vec<(BlockNumber, Hash)> {
self.inclusions.get(candidate)
}
}
async fn get_finalized_block_number<Sender>(sender: &mut Sender) -> FatalResult<BlockNumber>
@@ -578,3 +578,73 @@ fn scraper_handles_the_same_candidate_incuded_in_two_different_block_heights() {
assert!(!scraper.is_candidate_included(&magic_candidate.hash()));
});
}
#[test]
fn inclusions_per_candidate_properly_adds_and_prunes() {
const TEST_TARGET_BLOCK_NUMBER: BlockNumber = 2;
const TEST_TARGET_BLOCK_NUMBER_2: BlockNumber = 3;
// How many blocks should we skip before sending a leaf update.
const BLOCKS_TO_SKIP: usize = 4;
futures::executor::block_on(async {
let (state, mut virtual_overseer) = TestState::new().await;
let TestState { mut chain, mut scraper, mut ctx } = state;
// 1 because `TestState` starts at leaf 1.
let next_update = (1..BLOCKS_TO_SKIP).map(|_| next_leaf(&mut chain)).last().unwrap();
let mut finalized_block_number = 1;
let expected_ancestry_len = BLOCKS_TO_SKIP - finalized_block_number as usize;
let overseer_fut = overseer_process_active_leaves_update(
&mut virtual_overseer,
&chain,
finalized_block_number,
expected_ancestry_len,
|block_num| {
if block_num == TEST_TARGET_BLOCK_NUMBER || block_num == TEST_TARGET_BLOCK_NUMBER_2
{
get_backed_and_included_candidate_events(TEST_TARGET_BLOCK_NUMBER)
} else {
vec![]
}
},
);
join(process_active_leaves_update(ctx.sender(), &mut scraper, next_update), overseer_fut)
.await;
let candidate = make_candidate_receipt(get_block_number_hash(TEST_TARGET_BLOCK_NUMBER));
// We included the same candidate at two different block heights. So both blocks in which
// the candidate is included are recorded
assert_eq!(
scraper.get_blocks_including_candidate(&candidate.hash()),
Vec::from([
(TEST_TARGET_BLOCK_NUMBER, get_block_number_hash(TEST_TARGET_BLOCK_NUMBER)),
(TEST_TARGET_BLOCK_NUMBER_2, get_block_number_hash(TEST_TARGET_BLOCK_NUMBER_2))
])
);
// After `DISPUTE_CANDIDATE_LIFETIME_AFTER_FINALIZATION` blocks the earlier inclusion should be removed
finalized_block_number =
TEST_TARGET_BLOCK_NUMBER + DISPUTE_CANDIDATE_LIFETIME_AFTER_FINALIZATION;
process_finalized_block(&mut scraper, &finalized_block_number);
// The later inclusion should still be present, as we haven't exceeded its lifetime
assert_eq!(
scraper.get_blocks_including_candidate(&candidate.hash()),
Vec::from([(
TEST_TARGET_BLOCK_NUMBER_2,
get_block_number_hash(TEST_TARGET_BLOCK_NUMBER_2)
)])
);
finalized_block_number =
TEST_TARGET_BLOCK_NUMBER_2 + DISPUTE_CANDIDATE_LIFETIME_AFTER_FINALIZATION;
process_finalized_block(&mut scraper, &finalized_block_number);
// Now both inclusions have exceeded their lifetimes after finalization and should be purged
assert!(scraper.get_blocks_including_candidate(&candidate.hash()).len() == 0);
});
}
@@ -36,7 +36,7 @@ use polkadot_node_primitives::{
};
use polkadot_node_subsystem::{
messages::{
ApprovalVotingMessage, ChainApiMessage, DisputeCoordinatorMessage,
ApprovalVotingMessage, ChainApiMessage, ChainSelectionMessage, DisputeCoordinatorMessage,
DisputeDistributionMessage, ImportStatementsResult,
},
overseer::FromOrchestra,
@@ -3281,3 +3281,152 @@ fn participation_requests_reprioritized_for_newly_included() {
})
});
}
// When a dispute has concluded against a parachain block candidate we want to notify
// the chain selection subsystem. Then chain selection can revert the relay parents of
// the disputed candidate and mark all descendants as non-viable. This direct
// notification saves time compared to letting chain selection learn about a dispute
// conclusion from an on chain revert log.
#[test]
fn informs_chain_selection_when_dispute_concluded_against() {
test_harness(|mut test_state, mut virtual_overseer| {
Box::pin(async move {
let session = 1;
test_state.handle_resume_sync(&mut virtual_overseer, session).await;
let candidate_receipt = make_invalid_candidate_receipt();
let parent_1_number = 1;
let parent_2_number = 2;
let candidate_hash = candidate_receipt.hash();
// Including test candidate in 2 different parent blocks
let block_1_header = Header {
parent_hash: test_state.last_block,
number: parent_1_number,
digest: dummy_digest(),
state_root: dummy_hash(),
extrinsics_root: dummy_hash(),
};
let parent_1_hash = block_1_header.hash();
test_state
.activate_leaf_at_session(
&mut virtual_overseer,
session,
parent_1_number,
vec![make_candidate_included_event(candidate_receipt.clone())],
)
.await;
let block_2_header = Header {
parent_hash: test_state.last_block,
number: parent_2_number,
digest: dummy_digest(),
state_root: dummy_hash(),
extrinsics_root: dummy_hash(),
};
let parent_2_hash = block_2_header.hash();
test_state
.activate_leaf_at_session(
&mut virtual_overseer,
session,
parent_2_number,
vec![make_candidate_included_event(candidate_receipt.clone())],
)
.await;
let supermajority_threshold =
polkadot_primitives::v2::supermajority_threshold(test_state.validators.len());
let (valid_vote, invalid_vote) = generate_opposing_votes_pair(
&test_state,
ValidatorIndex(2),
ValidatorIndex(1),
candidate_hash,
session,
VoteType::Explicit,
)
.await;
let (pending_confirmation, confirmation_rx) = oneshot::channel();
virtual_overseer
.send(FromOrchestra::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
candidate_receipt: candidate_receipt.clone(),
session,
statements: vec![
(valid_vote, ValidatorIndex(2)),
(invalid_vote, ValidatorIndex(1)),
],
pending_confirmation: Some(pending_confirmation),
},
})
.await;
handle_approval_vote_request(&mut virtual_overseer, &candidate_hash, HashMap::new())
.await;
assert_matches!(confirmation_rx.await.unwrap(),
ImportStatementsResult::ValidImport => {}
);
// Use a different expected commitments hash to ensure the candidate validation returns invalid.
participation_with_distribution(
&mut virtual_overseer,
&candidate_hash,
CandidateCommitments::default().hash(),
)
.await;
let mut statements = Vec::new();
// minus 2, because of local vote and one previously imported invalid vote.
for i in (0_u32..supermajority_threshold as u32 - 2).map(|i| i + 3) {
let vote = test_state
.issue_explicit_statement_with_index(
ValidatorIndex(i),
candidate_hash,
session,
false,
)
.await;
statements.push((vote, ValidatorIndex(i as _)));
}
virtual_overseer
.send(FromOrchestra::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
candidate_receipt: candidate_receipt.clone(),
session,
statements,
pending_confirmation: None,
},
})
.await;
handle_approval_vote_request(&mut virtual_overseer, &candidate_hash, HashMap::new())
.await;
// Checking that concluded dispute has signaled the reversion of all parent blocks.
assert_matches!(
virtual_overseer.recv().await,
AllMessages::ChainSelection(
ChainSelectionMessage::RevertBlocks(revert_set)
) => {
assert!(revert_set.contains(&(parent_1_number, parent_1_hash)));
assert!(revert_set.contains(&(parent_2_number, parent_2_hash)));
},
"Overseer did not receive `ChainSelectionMessage::RevertBlocks` message"
);
// Wrap up
virtual_overseer.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await;
assert_matches!(
virtual_overseer.try_recv().await,
None => {}
);
test_state
})
});
}