Change best effort queue behaviour in dispute-coordinator (#6275)

* Change best effort queue behaviour in `dispute-coordinator`

Use the same type of queue (`BTreeMap<CandidateComparator,
ParticipationRequest>`) for best effort and priority in
`dispute-coordinator`.

Rework `CandidateComparator` to handle unavailable parent
block numbers.

Best effort queue will order disputes the same way as priority does - by
parent's block height. Disputes on candidates for which the parent's
block number can't be obtained will be treated with the lowest priority.

* Fix tests: Handle `ChainApiMessage::BlockNumber` in `handle_sync_queries`

* Some tests are deadlocking on sending messages via overseer so change `SingleItemSink`to `mpsc::Sender` with a buffer of 1

* Fix a race in test after adding a buffered queue for overseer messages

* Fix the rest of the tests

* Guide update - best-effort queue

* Guide update: clarification about spam votes

* Fix tests in `availability-distribution`

* Update comments

* Add `make_buffered_subsystem_context` in `subsystem-test-helpers`

* Code review feedback

* Code review feedback

* Code review feedback

* Don't add best effort candidate if it is already in priority queue

* Remove an old comment

* Fix insert in best_effort
This commit is contained in:
Tsvetomir Dimitrov
2022-11-17 17:41:19 +02:00
committed by GitHub
parent ad41e56e6e
commit ccad411e46
7 changed files with 329 additions and 196 deletions
@@ -14,10 +14,7 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>. // along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use std::{ use std::{cmp::Ordering, collections::BTreeMap};
cmp::Ordering,
collections::{BTreeMap, HashMap},
};
use futures::channel::oneshot; use futures::channel::oneshot;
use polkadot_node_subsystem::{messages::ChainApiMessage, overseer}; use polkadot_node_subsystem::{messages::ChainApiMessage, overseer};
@@ -50,25 +47,14 @@ const PRIORITY_QUEUE_SIZE: usize = 20_000;
#[cfg(test)] #[cfg(test)]
const PRIORITY_QUEUE_SIZE: usize = 2; const PRIORITY_QUEUE_SIZE: usize = 2;
/// Type for counting how often a candidate was added to the best effort queue.
type BestEffortCount = u32;
/// Queues for dispute participation. /// Queues for dispute participation.
/// In both queues we have a strict ordering of candidates and participation will
/// happen in that order. Refer to `CandidateComparator` for details on the ordering.
pub struct Queues { pub struct Queues {
/// Set of best effort participation requests. /// Set of best effort participation requests.
/// best_effort: BTreeMap<CandidateComparator, ParticipationRequest>,
/// Note that as size is limited to `BEST_EFFORT_QUEUE_SIZE` we simply do a linear search for
/// the entry with the highest `added_count` to determine what dispute to participate next in.
///
/// This mechanism leads to an amplifying effect - the more validators already participated,
/// the more likely it becomes that more validators will participate soon, which should lead to
/// a quick resolution of disputes, even in the best effort queue.
best_effort: HashMap<CandidateHash, BestEffortEntry>,
/// Priority queue. /// Priority queue.
///
/// In the priority queue, we have a strict ordering of candidates and participation will
/// happen in that order.
priority: BTreeMap<CandidateComparator, ParticipationRequest>, priority: BTreeMap<CandidateComparator, ParticipationRequest>,
} }
@@ -143,14 +129,13 @@ impl ParticipationRequest {
impl Queues { impl Queues {
/// Create new `Queues`. /// Create new `Queues`.
pub fn new() -> Self { pub fn new() -> Self {
Self { best_effort: HashMap::new(), priority: BTreeMap::new() } Self { best_effort: BTreeMap::new(), priority: BTreeMap::new() }
} }
/// Will put message in queue, either priority or best effort depending on priority. /// Will put message in queue, either priority or best effort depending on priority.
/// ///
/// If the message was already previously present on best effort, it will be moved to priority /// If the message was already previously present on best effort, it will be moved to priority
/// if it considered priority now, otherwise the `added_count` on the best effort queue will be /// if it is considered priority now.
/// bumped.
/// ///
/// Returns error in case a queue was found full already. /// Returns error in case a queue was found full already.
pub async fn queue( pub async fn queue(
@@ -159,94 +144,76 @@ impl Queues {
priority: ParticipationPriority, priority: ParticipationPriority,
req: ParticipationRequest, req: ParticipationRequest,
) -> Result<()> { ) -> Result<()> {
let comparator = match priority { let comparator = CandidateComparator::new(sender, &req.candidate_receipt).await?;
ParticipationPriority::BestEffort => None,
ParticipationPriority::Priority => self.queue_with_comparator(comparator, priority, req)?;
CandidateComparator::new(sender, &req.candidate_receipt).await?,
};
self.queue_with_comparator(comparator, req)?;
Ok(()) Ok(())
} }
/// Get the next best request for dispute participation /// Get the next best request for dispute participation if any.
/// /// First the priority queue is considered and then the best effort one.
/// if any. Priority queue is always considered first, then the best effort queue based on
/// `added_count`.
pub fn dequeue(&mut self) -> Option<ParticipationRequest> { pub fn dequeue(&mut self) -> Option<ParticipationRequest> {
if let Some(req) = self.pop_priority() { if let Some(req) = self.pop_priority() {
// In case a candidate became best effort over time, we might have it also queued in return Some(req.1)
// the best effort queue - get rid of any such entry:
self.best_effort.remove(req.candidate_hash());
return Some(req)
} }
self.pop_best_effort() self.pop_best_effort().map(|d| d.1)
} }
fn queue_with_comparator( fn queue_with_comparator(
&mut self, &mut self,
comparator: Option<CandidateComparator>, comparator: CandidateComparator,
priority: ParticipationPriority,
req: ParticipationRequest, req: ParticipationRequest,
) -> std::result::Result<(), QueueError> { ) -> std::result::Result<(), QueueError> {
if let Some(comparator) = comparator { if priority.is_priority() {
if self.priority.len() >= PRIORITY_QUEUE_SIZE { if self.priority.len() >= PRIORITY_QUEUE_SIZE {
return Err(QueueError::PriorityFull) return Err(QueueError::PriorityFull)
} }
// Remove any best effort entry: // Remove any best effort entry:
self.best_effort.remove(&req.candidate_hash); self.best_effort.remove(&comparator);
self.priority.insert(comparator, req); self.priority.insert(comparator, req);
} else { } else {
if self.priority.contains_key(&comparator) {
// The candidate is already in priority queue - don't
// add in in best effort too.
return Ok(())
}
if self.best_effort.len() >= BEST_EFFORT_QUEUE_SIZE { if self.best_effort.len() >= BEST_EFFORT_QUEUE_SIZE {
return Err(QueueError::BestEffortFull) return Err(QueueError::BestEffortFull)
} }
// Note: The request might have been added to priority in a previous call already, we self.best_effort.insert(comparator, req);
// take care of that case in `dequeue` (more efficient).
self.best_effort
.entry(req.candidate_hash)
.or_insert(BestEffortEntry { req, added_count: 0 })
.added_count += 1;
} }
Ok(()) Ok(())
} }
/// Get the next best from the best effort queue. /// Get best from the best effort queue.
/// fn pop_best_effort(&mut self) -> Option<(CandidateComparator, ParticipationRequest)> {
/// If there are multiple best - just pick one. return Self::pop_impl(&mut self.best_effort)
fn pop_best_effort(&mut self) -> Option<ParticipationRequest> {
let best = self.best_effort.iter().reduce(|(hash1, entry1), (hash2, entry2)| {
if entry1.added_count > entry2.added_count {
(hash1, entry1)
} else {
(hash2, entry2)
}
});
if let Some((best_hash, _)) = best {
let best_hash = best_hash.clone();
self.best_effort.remove(&best_hash).map(|e| e.req)
} else {
None
}
} }
/// Get best priority queue entry. /// Get best priority queue entry.
fn pop_priority(&mut self) -> Option<ParticipationRequest> { fn pop_priority(&mut self) -> Option<(CandidateComparator, ParticipationRequest)> {
return Self::pop_impl(&mut self.priority)
}
// `pop_best_effort` and `pop_priority` do the same but on different `BTreeMap`s. This function has
// the extracted implementation
fn pop_impl(
target: &mut BTreeMap<CandidateComparator, ParticipationRequest>,
) -> Option<(CandidateComparator, ParticipationRequest)> {
// Once https://github.com/rust-lang/rust/issues/62924 is there, we can use a simple: // Once https://github.com/rust-lang/rust/issues/62924 is there, we can use a simple:
// priority.pop_first(). // target.pop_first().
if let Some((comparator, _)) = self.priority.iter().next() { if let Some((comparator, _)) = target.iter().next() {
let comparator = comparator.clone(); let comparator = comparator.clone();
self.priority.remove(&comparator) target
.remove(&comparator)
.map(|participation_request| (comparator, participation_request))
} else { } else {
None None
} }
} }
} }
/// Entry for the best effort queue.
struct BestEffortEntry {
req: ParticipationRequest,
/// How often was the above request added to the queue.
added_count: BestEffortCount,
}
/// `Comparator` for ordering of disputes for candidates. /// `Comparator` for ordering of disputes for candidates.
/// ///
/// This `comparator` makes it possible to order disputes based on age and to ensure some fairness /// This `comparator` makes it possible to order disputes based on age and to ensure some fairness
@@ -266,9 +233,12 @@ struct BestEffortEntry {
#[derive(Copy, Clone)] #[derive(Copy, Clone)]
#[cfg_attr(test, derive(Debug))] #[cfg_attr(test, derive(Debug))]
struct CandidateComparator { struct CandidateComparator {
/// Block number of the relay parent. /// Block number of the relay parent. It's wrapped in an `Option<>` because there are cases when
/// it can't be obtained. For example when the node is lagging behind and new leaves are received
/// with a slight delay. Candidates with unknown relay parent are treated with the lowest priority.
/// ///
/// Important, so we will be participating in oldest disputes first. /// The order enforced by `CandidateComparator` is important because we want to participate in
/// the oldest disputes first.
/// ///
/// Note: In theory it would make more sense to use the `BlockNumber` of the including /// Note: In theory it would make more sense to use the `BlockNumber` of the including
/// block, as inclusion time is the actual relevant event when it comes to ordering. The /// block, as inclusion time is the actual relevant event when it comes to ordering. The
@@ -277,8 +247,10 @@ struct CandidateComparator {
/// just using the lowest `BlockNumber` of all available including blocks - the problem is, /// just using the lowest `BlockNumber` of all available including blocks - the problem is,
/// that is not stable. If a new fork appears after the fact, we would start ordering the same /// that is not stable. If a new fork appears after the fact, we would start ordering the same
/// candidate differently, which would result in the same candidate getting queued twice. /// candidate differently, which would result in the same candidate getting queued twice.
relay_parent_block_number: BlockNumber, relay_parent_block_number: Option<BlockNumber>,
/// By adding the `CandidateHash`, we can guarantee a unique ordering across candidates. /// By adding the `CandidateHash`, we can guarantee a unique ordering across candidates with the
/// same relay parent block number. Candidates without `relay_parent_block_number` are ordered by
/// the `candidate_hash` (and treated with the lowest priority, as already mentioned).
candidate_hash: CandidateHash, candidate_hash: CandidateHash,
} }
@@ -287,33 +259,35 @@ impl CandidateComparator {
/// ///
/// Useful for testing. /// Useful for testing.
#[cfg(test)] #[cfg(test)]
pub fn new_dummy(block_number: BlockNumber, candidate_hash: CandidateHash) -> Self { pub fn new_dummy(block_number: Option<BlockNumber>, candidate_hash: CandidateHash) -> Self {
Self { relay_parent_block_number: block_number, candidate_hash } Self { relay_parent_block_number: block_number, candidate_hash }
} }
/// Create a candidate comparator for a given candidate. /// Create a candidate comparator for a given candidate.
/// ///
/// Returns: /// Returns:
/// `Ok(None)` in case we could not lookup the candidate's relay parent, returns a /// - `Ok(CandidateComparator{Some(relay_parent_block_number), candidate_hash})` when the
/// `FatalError` in case the chain API call fails with an unexpected error. /// relay parent can be obtained. This is the happy case.
/// - `Ok(CandidateComparator{None, candidate_hash})` in case the candidate's relay parent
/// can't be obtained.
/// - `FatalError` in case the chain API call fails with an unexpected error.
pub async fn new( pub async fn new(
sender: &mut impl overseer::DisputeCoordinatorSenderTrait, sender: &mut impl overseer::DisputeCoordinatorSenderTrait,
candidate: &CandidateReceipt, candidate: &CandidateReceipt,
) -> FatalResult<Option<Self>> { ) -> FatalResult<Self> {
let candidate_hash = candidate.hash(); let candidate_hash = candidate.hash();
let n = match get_block_number(sender, candidate.descriptor().relay_parent).await? { let n = get_block_number(sender, candidate.descriptor().relay_parent).await?;
None => {
gum::warn!(
target: LOG_TARGET,
candidate_hash = ?candidate_hash,
"Candidate's relay_parent could not be found via chain API - `CandidateComparator could not be provided!"
);
return Ok(None)
},
Some(n) => n,
};
Ok(Some(CandidateComparator { relay_parent_block_number: n, candidate_hash })) if n.is_none() {
gum::warn!(
target: LOG_TARGET,
candidate_hash = ?candidate_hash,
"Candidate's relay_parent could not be found via chain API - `CandidateComparator` \
with an empty relay parent block number will be provided!"
);
}
Ok(CandidateComparator { relay_parent_block_number: n, candidate_hash })
} }
} }
@@ -333,11 +307,28 @@ impl PartialOrd for CandidateComparator {
impl Ord for CandidateComparator { impl Ord for CandidateComparator {
fn cmp(&self, other: &Self) -> Ordering { fn cmp(&self, other: &Self) -> Ordering {
match self.relay_parent_block_number.cmp(&other.relay_parent_block_number) { return match (self.relay_parent_block_number, other.relay_parent_block_number) {
Ordering::Equal => (), (None, None) => {
o => return o, // No relay parents for both -> compare hashes
self.candidate_hash.cmp(&other.candidate_hash)
},
(Some(self_relay_parent_block_num), Some(other_relay_parent_block_num)) => {
match self_relay_parent_block_num.cmp(&other_relay_parent_block_num) {
// if the relay parent is the same for both -> compare hashes
Ordering::Equal => self.candidate_hash.cmp(&other.candidate_hash),
// if not - return the result from comparing the relay parent block numbers
o => return o,
}
},
(Some(_), None) => {
// Candidates with known relay parents are always with priority
Ordering::Less
},
(None, Some(_)) => {
// Ditto
Ordering::Greater
},
} }
self.candidate_hash.cmp(&other.candidate_hash)
} }
} }
@@ -14,6 +14,7 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>. // along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use crate::ParticipationPriority;
use ::test_helpers::{dummy_candidate_receipt, dummy_hash}; use ::test_helpers::{dummy_candidate_receipt, dummy_hash};
use assert_matches::assert_matches; use assert_matches::assert_matches;
use polkadot_primitives::v2::{BlockNumber, Hash}; use polkadot_primitives::v2::{BlockNumber, Hash};
@@ -31,15 +32,16 @@ fn make_participation_request(hash: Hash) -> ParticipationRequest {
/// Make dummy comparator for request, based on the given block number. /// Make dummy comparator for request, based on the given block number.
fn make_dummy_comparator( fn make_dummy_comparator(
req: &ParticipationRequest, req: &ParticipationRequest,
relay_parent: BlockNumber, relay_parent: Option<BlockNumber>,
) -> CandidateComparator { ) -> CandidateComparator {
CandidateComparator::new_dummy(relay_parent, *req.candidate_hash()) CandidateComparator::new_dummy(relay_parent, *req.candidate_hash())
} }
/// Check that dequeuing acknowledges order. /// Check that dequeuing acknowledges order.
/// ///
/// Any priority item will be dequeued before any best effort items, priority items will be /// Any priority item will be dequeued before any best effort items, priority and best effort with
/// processed in order. Best effort items, based on how often they have been added. /// known parent block number items will be processed in order. Best effort items without known parent
/// block number should be treated with lowest priority.
#[test] #[test]
fn ordering_works_as_expected() { fn ordering_works_as_expected() {
let mut queue = Queues::new(); let mut queue = Queues::new();
@@ -47,36 +49,69 @@ fn ordering_works_as_expected() {
let req_prio = make_participation_request(Hash::repeat_byte(0x02)); let req_prio = make_participation_request(Hash::repeat_byte(0x02));
let req3 = make_participation_request(Hash::repeat_byte(0x03)); let req3 = make_participation_request(Hash::repeat_byte(0x03));
let req_prio_2 = make_participation_request(Hash::repeat_byte(0x04)); let req_prio_2 = make_participation_request(Hash::repeat_byte(0x04));
let req5 = make_participation_request(Hash::repeat_byte(0x05)); let req5_unknown_parent = make_participation_request(Hash::repeat_byte(0x05));
let req_full = make_participation_request(Hash::repeat_byte(0x06)); let req_full = make_participation_request(Hash::repeat_byte(0x06));
let req_prio_full = make_participation_request(Hash::repeat_byte(0x07)); let req_prio_full = make_participation_request(Hash::repeat_byte(0x07));
queue.queue_with_comparator(None, req1.clone()).unwrap();
queue queue
.queue_with_comparator(Some(make_dummy_comparator(&req_prio, 1)), req_prio.clone()) .queue_with_comparator(
make_dummy_comparator(&req1, Some(1)),
ParticipationPriority::BestEffort,
req1.clone(),
)
.unwrap(); .unwrap();
queue.queue_with_comparator(None, req3.clone()).unwrap();
queue queue
.queue_with_comparator(Some(make_dummy_comparator(&req_prio_2, 2)), req_prio_2.clone()) .queue_with_comparator(
make_dummy_comparator(&req_prio, Some(1)),
ParticipationPriority::Priority,
req_prio.clone(),
)
.unwrap();
queue
.queue_with_comparator(
make_dummy_comparator(&req3, Some(2)),
ParticipationPriority::BestEffort,
req3.clone(),
)
.unwrap();
queue
.queue_with_comparator(
make_dummy_comparator(&req_prio_2, Some(2)),
ParticipationPriority::Priority,
req_prio_2.clone(),
)
.unwrap();
queue
.queue_with_comparator(
make_dummy_comparator(&req5_unknown_parent, None),
ParticipationPriority::BestEffort,
req5_unknown_parent.clone(),
)
.unwrap(); .unwrap();
queue.queue_with_comparator(None, req3.clone()).unwrap();
queue.queue_with_comparator(None, req5.clone()).unwrap();
assert_matches!( assert_matches!(
queue.queue_with_comparator(Some(make_dummy_comparator(&req_prio_full, 3)), req_prio_full), queue.queue_with_comparator(
make_dummy_comparator(&req_prio_full, Some(3)),
ParticipationPriority::Priority,
req_prio_full
),
Err(QueueError::PriorityFull) Err(QueueError::PriorityFull)
); );
assert_matches!(queue.queue_with_comparator(None, req_full), Err(QueueError::BestEffortFull)); assert_matches!(
queue.queue_with_comparator(
make_dummy_comparator(&req_full, Some(3)),
ParticipationPriority::BestEffort,
req_full
),
Err(QueueError::BestEffortFull)
);
// Prioritized queue is ordered correctly
assert_eq!(queue.dequeue(), Some(req_prio)); assert_eq!(queue.dequeue(), Some(req_prio));
assert_eq!(queue.dequeue(), Some(req_prio_2)); assert_eq!(queue.dequeue(), Some(req_prio_2));
// So is the best-effort
assert_eq!(queue.dequeue(), Some(req1));
assert_eq!(queue.dequeue(), Some(req3)); assert_eq!(queue.dequeue(), Some(req3));
assert_matches!( assert_eq!(queue.dequeue(), Some(req5_unknown_parent));
queue.dequeue(),
Some(r) => { assert!(r == req1 || r == req5) }
);
assert_matches!(
queue.dequeue(),
Some(r) => { assert!(r == req1 || r == req5) }
);
assert_matches!(queue.dequeue(), None); assert_matches!(queue.dequeue(), None);
} }
@@ -89,23 +124,50 @@ fn candidate_is_only_dequeued_once() {
let req_best_effort_then_prio = make_participation_request(Hash::repeat_byte(0x03)); let req_best_effort_then_prio = make_participation_request(Hash::repeat_byte(0x03));
let req_prio_then_best_effort = make_participation_request(Hash::repeat_byte(0x04)); let req_prio_then_best_effort = make_participation_request(Hash::repeat_byte(0x04));
queue.queue_with_comparator(None, req1.clone()).unwrap();
queue queue
.queue_with_comparator(Some(make_dummy_comparator(&req_prio, 1)), req_prio.clone()) .queue_with_comparator(
make_dummy_comparator(&req1, None),
ParticipationPriority::BestEffort,
req1.clone(),
)
.unwrap();
queue
.queue_with_comparator(
make_dummy_comparator(&req_prio, Some(1)),
ParticipationPriority::Priority,
req_prio.clone(),
)
.unwrap(); .unwrap();
// Insert same best effort again: // Insert same best effort again:
queue.queue_with_comparator(None, req1.clone()).unwrap(); queue
.queue_with_comparator(
make_dummy_comparator(&req1, None),
ParticipationPriority::BestEffort,
req1.clone(),
)
.unwrap();
// insert same prio again: // insert same prio again:
queue queue
.queue_with_comparator(Some(make_dummy_comparator(&req_prio, 1)), req_prio.clone()) .queue_with_comparator(
make_dummy_comparator(&req_prio, Some(1)),
ParticipationPriority::Priority,
req_prio.clone(),
)
.unwrap(); .unwrap();
// Insert first as best effort: // Insert first as best effort:
queue.queue_with_comparator(None, req_best_effort_then_prio.clone()).unwrap(); queue
.queue_with_comparator(
make_dummy_comparator(&req_best_effort_then_prio, Some(2)),
ParticipationPriority::BestEffort,
req_best_effort_then_prio.clone(),
)
.unwrap();
// Then as prio: // Then as prio:
queue queue
.queue_with_comparator( .queue_with_comparator(
Some(make_dummy_comparator(&req_best_effort_then_prio, 2)), make_dummy_comparator(&req_best_effort_then_prio, Some(2)),
ParticipationPriority::Priority,
req_best_effort_then_prio.clone(), req_best_effort_then_prio.clone(),
) )
.unwrap(); .unwrap();
@@ -116,12 +178,19 @@ fn candidate_is_only_dequeued_once() {
// Insert first as prio: // Insert first as prio:
queue queue
.queue_with_comparator( .queue_with_comparator(
Some(make_dummy_comparator(&req_prio_then_best_effort, 3)), make_dummy_comparator(&req_prio_then_best_effort, Some(3)),
ParticipationPriority::Priority,
req_prio_then_best_effort.clone(), req_prio_then_best_effort.clone(),
) )
.unwrap(); .unwrap();
// Then as best effort: // Then as best effort:
queue.queue_with_comparator(None, req_prio_then_best_effort.clone()).unwrap(); queue
.queue_with_comparator(
make_dummy_comparator(&req_prio_then_best_effort, Some(3)),
ParticipationPriority::BestEffort,
req_prio_then_best_effort.clone(),
)
.unwrap();
assert_eq!(queue.dequeue(), Some(req_best_effort_then_prio)); assert_eq!(queue.dequeue(), Some(req_best_effort_then_prio));
assert_eq!(queue.dequeue(), Some(req_prio_then_best_effort)); assert_eq!(queue.dequeue(), Some(req_prio_then_best_effort));
@@ -29,7 +29,10 @@ use parity_scale_codec::Encode;
use polkadot_node_primitives::{AvailableData, BlockData, InvalidCandidate, PoV}; use polkadot_node_primitives::{AvailableData, BlockData, InvalidCandidate, PoV};
use polkadot_node_subsystem::{ use polkadot_node_subsystem::{
jaeger, jaeger,
messages::{AllMessages, DisputeCoordinatorMessage, RuntimeApiMessage, RuntimeApiRequest}, messages::{
AllMessages, ChainApiMessage, DisputeCoordinatorMessage, RuntimeApiMessage,
RuntimeApiRequest,
},
ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, SpawnGlue, ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, SpawnGlue,
}; };
use polkadot_node_subsystem_test_helpers::{ use polkadot_node_subsystem_test_helpers::{
@@ -221,9 +224,9 @@ fn same_req_wont_get_queued_if_participation_is_already_running() {
#[test] #[test]
fn reqs_get_queued_when_out_of_capacity() { fn reqs_get_queued_when_out_of_capacity() {
futures::executor::block_on(async { let (mut ctx, mut ctx_handle) = make_our_subsystem_context(TaskExecutor::new());
let (mut ctx, mut ctx_handle) = make_our_subsystem_context(TaskExecutor::new());
let test = async {
let (sender, mut worker_receiver) = mpsc::channel(1); let (sender, mut worker_receiver) = mpsc::channel(1);
let mut participation = Participation::new(sender); let mut participation = Participation::new(sender);
activate_leaf(&mut ctx, &mut participation, 10).await.unwrap(); activate_leaf(&mut ctx, &mut participation, 10).await.unwrap();
@@ -239,43 +242,81 @@ fn reqs_get_queued_when_out_of_capacity() {
} }
for _ in 0..MAX_PARALLEL_PARTICIPATIONS + 1 { for _ in 0..MAX_PARALLEL_PARTICIPATIONS + 1 {
assert_matches!(
ctx_handle.recv().await,
AllMessages::AvailabilityRecovery(
AvailabilityRecoveryMessage::RecoverAvailableData(_, _, _, tx)
) => {
tx.send(Err(RecoveryError::Unavailable)).unwrap();
},
"overseer did not receive recover available data message",
);
let result = participation let result = participation
.get_participation_result(&mut ctx, worker_receiver.next().await.unwrap()) .get_participation_result(&mut ctx, worker_receiver.next().await.unwrap())
.await .await
.unwrap(); .unwrap();
assert_matches!( assert_matches!(
result.outcome, result.outcome,
ParticipationOutcome::Unavailable => {} ParticipationOutcome::Unavailable => {}
); );
} }
// we should not have any further recovery requests:
// we should not have any further results nor recovery requests:
assert_matches!(ctx_handle.recv().timeout(Duration::from_millis(10)).await, None);
assert_matches!(worker_receiver.next().timeout(Duration::from_millis(10)).await, None); assert_matches!(worker_receiver.next().timeout(Duration::from_millis(10)).await, None);
}) };
let request_handler = async {
let mut recover_available_data_msg_count = 0;
let mut block_number_msg_count = 0;
while recover_available_data_msg_count < MAX_PARALLEL_PARTICIPATIONS + 1 ||
block_number_msg_count < 1
{
match ctx_handle.recv().await {
AllMessages::AvailabilityRecovery(
AvailabilityRecoveryMessage::RecoverAvailableData(_, _, _, tx),
) => {
tx.send(Err(RecoveryError::Unavailable)).unwrap();
recover_available_data_msg_count += 1;
},
AllMessages::ChainApi(ChainApiMessage::BlockNumber(_, tx)) => {
tx.send(Ok(None)).unwrap();
block_number_msg_count += 1;
},
_ => assert!(false, "Received unexpected message"),
}
}
// we should not have any further results
assert_matches!(ctx_handle.recv().timeout(Duration::from_millis(10)).await, None);
};
futures::executor::block_on(async {
futures::join!(test, request_handler);
});
} }
#[test] #[test]
fn reqs_get_queued_on_no_recent_block() { fn reqs_get_queued_on_no_recent_block() {
futures::executor::block_on(async { let (mut ctx, mut ctx_handle) = make_our_subsystem_context(TaskExecutor::new());
let (mut ctx, mut ctx_handle) = make_our_subsystem_context(TaskExecutor::new()); let (mut unblock_test, mut wait_for_verification) = mpsc::channel(0);
let test = async {
let (sender, _worker_receiver) = mpsc::channel(1); let (sender, _worker_receiver) = mpsc::channel(1);
let mut participation = Participation::new(sender); let mut participation = Participation::new(sender);
participate(&mut ctx, &mut participation).await.unwrap(); participate(&mut ctx, &mut participation).await.unwrap();
assert!(ctx_handle.recv().timeout(Duration::from_millis(10)).await.is_none());
// We have initiated participation but we'll block `active_leaf` so that we can check that
// the participation is queued in race-free way
let _ = wait_for_verification.next().await.unwrap();
activate_leaf(&mut ctx, &mut participation, 10).await.unwrap(); activate_leaf(&mut ctx, &mut participation, 10).await.unwrap();
};
// Responds to messages from the test and verifies its behaviour
let request_handler = async {
// If we receive `BlockNumber` request this implicitly proves that the participation is queued
assert_matches!(
ctx_handle.recv().await,
AllMessages::ChainApi(ChainApiMessage::BlockNumber(_, tx)) => {
tx.send(Ok(None)).unwrap();
},
"overseer did not receive `ChainApiMessage::BlockNumber` message",
);
assert!(ctx_handle.recv().timeout(Duration::from_millis(10)).await.is_none());
// No activity so the participation is queued => unblock the test
unblock_test.send(()).await.unwrap();
// after activating at least one leaf the recent block // after activating at least one leaf the recent block
// state should be available which should lead to trying // state should be available which should lead to trying
@@ -288,7 +329,11 @@ fn reqs_get_queued_on_no_recent_block() {
)), )),
"overseer did not receive recover available data message", "overseer did not receive recover available data message",
); );
}) };
futures::executor::block_on(async {
futures::join!(test, request_handler);
});
} }
#[test] #[test]
@@ -57,7 +57,9 @@ use polkadot_node_subsystem::{
messages::{AllMessages, BlockDescription, RuntimeApiMessage, RuntimeApiRequest}, messages::{AllMessages, BlockDescription, RuntimeApiMessage, RuntimeApiRequest},
ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, ActivatedLeaf, ActiveLeavesUpdate, LeafStatus,
}; };
use polkadot_node_subsystem_test_helpers::{make_subsystem_context, TestSubsystemContextHandle}; use polkadot_node_subsystem_test_helpers::{
make_buffered_subsystem_context, TestSubsystemContextHandle,
};
use polkadot_primitives::v2::{ use polkadot_primitives::v2::{
ApprovalVote, BlockNumber, CandidateCommitments, CandidateEvent, CandidateHash, ApprovalVote, BlockNumber, CandidateCommitments, CandidateEvent, CandidateHash,
CandidateReceipt, CoreIndex, DisputeStatement, GroupIndex, Hash, HeadData, Header, IndexedVec, CandidateReceipt, CoreIndex, DisputeStatement, GroupIndex, Hash, HeadData, Header, IndexedVec,
@@ -382,6 +384,10 @@ impl TestState {
); );
gum::trace!("After answering runtime API request (votes)"); gum::trace!("After answering runtime API request (votes)");
}, },
AllMessages::ChainApi(ChainApiMessage::BlockNumber(hash, tx)) => {
let block_num = self.headers.get(&hash).map(|header| header.number);
tx.send(Ok(block_num)).unwrap();
},
msg => { msg => {
panic!("Received unexpected message in `handle_sync_queries`: {:?}", msg); panic!("Received unexpected message in `handle_sync_queries`: {:?}", msg);
}, },
@@ -521,7 +527,7 @@ impl TestState {
F: FnOnce(TestState, VirtualOverseer) -> BoxFuture<'static, TestState>, F: FnOnce(TestState, VirtualOverseer) -> BoxFuture<'static, TestState>,
{ {
self.known_session = None; self.known_session = None;
let (ctx, ctx_handle) = make_subsystem_context(TaskExecutor::new()); let (ctx, ctx_handle) = make_buffered_subsystem_context(TaskExecutor::new(), 1);
let subsystem = DisputeCoordinatorSubsystem::new( let subsystem = DisputeCoordinatorSubsystem::new(
self.db.clone(), self.db.clone(),
self.config.clone(), self.config.clone(),
@@ -2838,6 +2844,12 @@ fn negative_issue_local_statement_only_triggers_import() {
}) })
.await; .await;
// Assert that subsystem is not participating.
assert!(virtual_overseer.recv().timeout(TEST_TIMEOUT).await.is_none());
virtual_overseer.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await;
assert!(virtual_overseer.try_recv().await.is_none());
let backend = DbBackend::new( let backend = DbBackend::new(
test_state.db.clone(), test_state.db.clone(),
test_state.config.column_config(), test_state.config.column_config(),
@@ -2851,12 +2863,6 @@ fn negative_issue_local_statement_only_triggers_import() {
let disputes = backend.load_recent_disputes().unwrap(); let disputes = backend.load_recent_disputes().unwrap();
assert_eq!(disputes, None); assert_eq!(disputes, None);
// Assert that subsystem is not participating.
assert!(virtual_overseer.recv().timeout(TEST_TIMEOUT).await.is_none());
virtual_overseer.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await;
assert!(virtual_overseer.try_recv().await.is_none());
test_state test_state
}) })
}); });
@@ -51,7 +51,7 @@ use polkadot_primitives::v2::{
CandidateHash, CoreState, GroupIndex, Hash, Id as ParaId, ScheduledCore, SessionInfo, CandidateHash, CoreState, GroupIndex, Hash, Id as ParaId, ScheduledCore, SessionInfo,
ValidatorIndex, ValidatorIndex,
}; };
use test_helpers::{mock::make_ferdie_keystore, SingleItemSink}; use test_helpers::mock::make_ferdie_keystore;
use super::mock::{make_session_info, OccupiedCoreBuilder}; use super::mock::{make_session_info, OccupiedCoreBuilder};
use crate::LOG_TARGET; use crate::LOG_TARGET;
@@ -295,7 +295,7 @@ impl TestState {
} }
async fn overseer_signal( async fn overseer_signal(
mut tx: SingleItemSink<FromOrchestra<AvailabilityDistributionMessage>>, mut tx: mpsc::Sender<FromOrchestra<AvailabilityDistributionMessage>>,
msg: impl Into<OverseerSignal>, msg: impl Into<OverseerSignal>,
) { ) {
let msg = msg.into(); let msg = msg.into();
@@ -177,7 +177,7 @@ where
/// A test subsystem context. /// A test subsystem context.
pub struct TestSubsystemContext<M, S> { pub struct TestSubsystemContext<M, S> {
tx: TestSubsystemSender, tx: TestSubsystemSender,
rx: SingleItemStream<FromOrchestra<M>>, rx: mpsc::Receiver<FromOrchestra<M>>,
spawn: S, spawn: S,
} }
@@ -239,7 +239,7 @@ pub struct TestSubsystemContextHandle<M> {
/// ///
/// Useful for shared ownership situations (one can have multiple senders, but only one /// Useful for shared ownership situations (one can have multiple senders, but only one
/// receiver. /// receiver.
pub tx: SingleItemSink<FromOrchestra<M>>, pub tx: mpsc::Sender<FromOrchestra<M>>,
/// Direct access to the receiver. /// Direct access to the receiver.
pub rx: mpsc::UnboundedReceiver<AllMessages>, pub rx: mpsc::UnboundedReceiver<AllMessages>,
@@ -280,11 +280,22 @@ impl<M> TestSubsystemContextHandle<M> {
} }
} }
/// Make a test subsystem context. /// Make a test subsystem context with `buffer_size == 0`. This is used by most
/// of the tests.
pub fn make_subsystem_context<M, S>( pub fn make_subsystem_context<M, S>(
spawner: S, spawner: S,
) -> (TestSubsystemContext<M, SpawnGlue<S>>, TestSubsystemContextHandle<M>) { ) -> (TestSubsystemContext<M, SpawnGlue<S>>, TestSubsystemContextHandle<M>) {
let (overseer_tx, overseer_rx) = single_item_sink(); make_buffered_subsystem_context(spawner, 0)
}
/// Make a test subsystem context with buffered overseer channel. Some tests (e.g.
/// `dispute-coordinator`) create too many parallel operations and deadlock unless
/// the channel is buffered. Usually `buffer_size=1` is enough.
pub fn make_buffered_subsystem_context<M, S>(
spawner: S,
buffer_size: usize,
) -> (TestSubsystemContext<M, SpawnGlue<S>>, TestSubsystemContextHandle<M>) {
let (overseer_tx, overseer_rx) = mpsc::channel(buffer_size);
let (all_messages_tx, all_messages_rx) = mpsc::unbounded(); let (all_messages_tx, all_messages_rx) = mpsc::unbounded();
( (
@@ -9,7 +9,7 @@ In particular the dispute-coordinator is responsible for:
- Ensuring that the node is able to raise a dispute in case an invalid candidate - Ensuring that the node is able to raise a dispute in case an invalid candidate
is found during approval checking. is found during approval checking.
- Ensuring lazy approval votes (votes given without running the parachain - Ensuring lazy approval votes (votes given without running the parachain
validation function) will be recorded, so lazy nodes can get slashed properly. validation function) will be recorded, so lazy nodes can get slashed properly.
- Coordinating actual participation in a dispute, ensuring that the node - Coordinating actual participation in a dispute, ensuring that the node
participates in any justified dispute in a way that ensures resolution of participates in any justified dispute in a way that ensures resolution of
@@ -84,18 +84,18 @@ While there is no need to record approval votes in the dispute coordinator
preemptively, we do need to make sure they are recorded when a dispute preemptively, we do need to make sure they are recorded when a dispute
actually happens. This is because only votes recorded by the dispute actually happens. This is because only votes recorded by the dispute
coordinator will be considered for slashing. It is sufficient for our coordinator will be considered for slashing. It is sufficient for our
threat model that malicious backers are slashed as opposed to both backers and threat model that malicious backers are slashed as opposed to both backers and
approval checkers. However, we still must import approval votes from the approvals approval checkers. However, we still must import approval votes from the approvals
process into the disputes process to ensure that lazy approval checkers process into the disputes process to ensure that lazy approval checkers
actually run the parachain validation function. Slashing lazy approval checkers is necessary, else we risk a useless approvals process where every approval actually run the parachain validation function. Slashing lazy approval checkers is necessary, else we risk a useless approvals process where every approval
checker blindly votes valid for every candidate. If we did not import approval checker blindly votes valid for every candidate. If we did not import approval
votes, lazy nodes would likely cast a properly checked explicit vote as part votes, lazy nodes would likely cast a properly checked explicit vote as part
of the dispute in addition to their blind approval vote and thus avoid a slash. of the dispute in addition to their blind approval vote and thus avoid a slash.
With the 2/3rd honest assumption it seems unrealistic that lazy approval voters With the 2/3rd honest assumption it seems unrealistic that lazy approval voters
will keep sending unchecked approval votes once they became aware of a raised will keep sending unchecked approval votes once they became aware of a raised
dispute. Hence the most crucial approval votes to import are the early ones dispute. Hence the most crucial approval votes to import are the early ones
(tranche 0), to take into account network latencies and such we still want to (tranche 0), to take into account network latencies and such we still want to
import approval votes at a later point in time as well (in particular we need import approval votes at a later point in time as well (in particular we need
to make sure the dispute can conclude, but more on that later). to make sure the dispute can conclude, but more on that later).
As mentioned already previously, importing votes is most efficient when batched. As mentioned already previously, importing votes is most efficient when batched.
@@ -202,11 +202,11 @@ time participation is faster than approval, a node would do double work.
### Ensuring Chain Import ### Ensuring Chain Import
While in the previous section we discussed means for nodes to ensure relevant While in the previous section we discussed means for nodes to ensure relevant
votes are recorded so lazy approval checkers get slashed properly, it is crucial votes are recorded so lazy approval checkers get slashed properly, it is crucial
to also discuss the actual chain import. Only if we guarantee that recorded votes to also discuss the actual chain import. Only if we guarantee that recorded votes
will also get imported on chain (on all potential chains really) we will succeed will also get imported on chain (on all potential chains really) we will succeed
in executing slashes. Particularly we need to make sure backing votes end up on in executing slashes. Particularly we need to make sure backing votes end up on
chain consistantly. In contrast recording and slashing lazy approval voters only chain consistantly. In contrast recording and slashing lazy approval voters only
needs to be likely, not certain. needs to be likely, not certain.
Dispute distribution will make sure all explicit dispute votes get distributed Dispute distribution will make sure all explicit dispute votes get distributed
@@ -227,14 +227,14 @@ production in the current set - they might only exist on an already abandoned
fork. This means a block producer that just joined the set, might not have seen fork. This means a block producer that just joined the set, might not have seen
any of them. any of them.
For approvals it is even more tricky and less necessary: Approval voting together For approvals it is even more tricky and less necessary: Approval voting together
with finalization is a completely off-chain process therefore those protocols with finalization is a completely off-chain process therefore those protocols
don't care about block production at all. Approval votes only have a guarantee of don't care about block production at all. Approval votes only have a guarantee of
being propagated between the nodes that are responsible for finalizing the being propagated between the nodes that are responsible for finalizing the
concerned blocks. This implies that on an era change the current authority set, concerned blocks. This implies that on an era change the current authority set,
will not necessarily get informed about any approval votes for the previous era. will not necessarily get informed about any approval votes for the previous era.
Hence even if all validators of the previous era successfully recorded all approval Hence even if all validators of the previous era successfully recorded all approval
votes in the dispute coordinator, they won't get a chance to put them on chain, votes in the dispute coordinator, they won't get a chance to put them on chain,
hence they won't be considered for slashing. hence they won't be considered for slashing.
It is important to note, that the essential properties of the system still hold: It is important to note, that the essential properties of the system still hold:
@@ -359,14 +359,19 @@ times instead of just once to the oldest offender. This is obviously a good
idea, in particular it makes it impossible for an attacker to prevent rolling idea, in particular it makes it impossible for an attacker to prevent rolling
back a very old candidate, by keeping raising disputes for newer candidates. back a very old candidate, by keeping raising disputes for newer candidates.
For candidates we have not seen included, but we have our availability piece For candidates we have not seen included, but we know are backed (thanks to chain
available we put participation on a best-effort queue, which at the moment is scraping) or we have seen a dispute with 1/3+1 participation (confirmed dispute)
processed on the basis how often we requested participation locally, which on them - we put participation on a best-effort queue. It has got the same
equals the number of times we imported votes for that dispute. The idea is, if ordering as the priority one - by block heights of the relay parent, older blocks
we have not seen the candidate included, but the dispute is valid, other nodes are with priority. There is a possibility not to be able to obtain the block number
will have seen it included - so the more votes there are, the more likely it is of the parent when we are inserting the dispute in the queue. The reason for this
a valid dispute and we should implicitly arrive at a similar ordering as the is either the dispute is completely made up or we are out of sync with the other
nodes that are able to sort based on the relay parent block height. nodes in terms of last finalized block. The former is very unlikely. If we are
adding a dispute in best-effort it should already be either confirmed or the
candidate is backed. In the latter case we will promote the dispute to the
priority queue once we learn about the new block. NOTE: this is still work in
progress and is tracked by [this issue]
(https://github.com/paritytech/polkadot/issues/5875).
#### Import #### Import
@@ -381,6 +386,12 @@ dispute coordinator level (dispute-distribution also has its own), which is spam
slots. For each import, where we don't know whether it might be spam or not we slots. For each import, where we don't know whether it might be spam or not we
increment a counter for each signing participant of explicit `invalid` votes. increment a counter for each signing participant of explicit `invalid` votes.
What votes do we treat as a potential spam? A vote will increase a spam slot if
and only if all of the following condidions are satisfied:
* the candidate under dispute is not included on any chain
* the dispute is not confirmed
* we haven't casted a vote for the dispute
The reason this works is because we only need to worry about actual dispute The reason this works is because we only need to worry about actual dispute
votes. Import of backing votes are already rate limited and concern only real votes. Import of backing votes are already rate limited and concern only real
candidates for approval votes a similar argument holds (if they come from candidates for approval votes a similar argument holds (if they come from