mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-14 01:41:09 +00:00
Batch vote import in dispute-distribution (#5894)
* Start work on batching in dispute-distribution. * Guide work. * More guide changes. Still very much WIP. * Finish guide changes. * Clarification * Adjust argument about slashing. * WIP: Add constants to receiver. * Maintain order of disputes. * dispute-distribuion sender Rate limit. * Cleanup * WIP: dispute-distribution receiver. - [ ] Rate limiting - [ ] Batching * WIP: Batching. * fmt * Update `PeerQueues` to maintain more invariants. * WIP: Batching. * Small cleanup * Batching logic. * Some integration work. * Finish. Missing: Tests * Typo. * Docs. * Report missing metric. * Doc pass. * Tests for waiting_queue. * Speed up some crypto by 10x. * Fix redundant import. * Add some tracing. * Better sender rate limit * Some tests. * Tests * Add logging to rate limiter * Update roadmap/implementers-guide/src/node/disputes/dispute-distribution.md Co-authored-by: Tsvetomir Dimitrov <tsvetomir@parity.io> * Update roadmap/implementers-guide/src/node/disputes/dispute-distribution.md Co-authored-by: Tsvetomir Dimitrov <tsvetomir@parity.io> * Update node/network/dispute-distribution/src/receiver/mod.rs Co-authored-by: Tsvetomir Dimitrov <tsvetomir@parity.io> * Review feedback. * Also log peer in log messages. * Fix indentation. * waker -> timer * Guide improvement. * Remove obsolete comment. * waker -> timer * Fix spell complaints. * Fix Cargo.lock Co-authored-by: Tsvetomir Dimitrov <tsvetomir@parity.io>
This commit is contained in:
Generated
+1
@@ -6334,6 +6334,7 @@ dependencies = [
|
||||
"fatality",
|
||||
"futures",
|
||||
"futures-timer",
|
||||
"indexmap",
|
||||
"lazy_static",
|
||||
"lru 0.8.0",
|
||||
"parity-scale-codec",
|
||||
|
||||
+4
-3
@@ -125,9 +125,9 @@ maintenance = { status = "actively-developed" }
|
||||
#
|
||||
# This list is ordered alphabetically.
|
||||
[profile.dev.package]
|
||||
blake2b_simd = { opt-level = 3 }
|
||||
blake2 = { opt-level = 3 }
|
||||
blake2-rfc = { opt-level = 3 }
|
||||
blake2b_simd = { opt-level = 3 }
|
||||
chacha20poly1305 = { opt-level = 3 }
|
||||
cranelift-codegen = { opt-level = 3 }
|
||||
cranelift-wasm = { opt-level = 3 }
|
||||
@@ -138,8 +138,8 @@ curve25519-dalek = { opt-level = 3 }
|
||||
ed25519-dalek = { opt-level = 3 }
|
||||
flate2 = { opt-level = 3 }
|
||||
futures-channel = { opt-level = 3 }
|
||||
hashbrown = { opt-level = 3 }
|
||||
hash-db = { opt-level = 3 }
|
||||
hashbrown = { opt-level = 3 }
|
||||
hmac = { opt-level = 3 }
|
||||
httparse = { opt-level = 3 }
|
||||
integer-sqrt = { opt-level = 3 }
|
||||
@@ -151,8 +151,8 @@ libz-sys = { opt-level = 3 }
|
||||
mio = { opt-level = 3 }
|
||||
nalgebra = { opt-level = 3 }
|
||||
num-bigint = { opt-level = 3 }
|
||||
parking_lot_core = { opt-level = 3 }
|
||||
parking_lot = { opt-level = 3 }
|
||||
parking_lot_core = { opt-level = 3 }
|
||||
percent-encoding = { opt-level = 3 }
|
||||
primitive-types = { opt-level = 3 }
|
||||
reed-solomon-novelpoly = { opt-level = 3 }
|
||||
@@ -162,6 +162,7 @@ sha2 = { opt-level = 3 }
|
||||
sha3 = { opt-level = 3 }
|
||||
smallvec = { opt-level = 3 }
|
||||
snow = { opt-level = 3 }
|
||||
substrate-bip39 = {opt-level = 3}
|
||||
twox-hash = { opt-level = 3 }
|
||||
uint = { opt-level = 3 }
|
||||
wasmi = { opt-level = 3 }
|
||||
|
||||
@@ -6,6 +6,7 @@ edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
futures = "0.3.21"
|
||||
futures-timer = "3.0.2"
|
||||
gum = { package = "tracing-gum", path = "../../gum" }
|
||||
derive_more = "0.99.17"
|
||||
parity-scale-codec = { version = "3.1.5", features = ["std"] }
|
||||
@@ -21,6 +22,7 @@ sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "maste
|
||||
thiserror = "1.0.31"
|
||||
fatality = "0.0.6"
|
||||
lru = "0.8.0"
|
||||
indexmap = "1.9.1"
|
||||
|
||||
[dev-dependencies]
|
||||
async-trait = "0.1.57"
|
||||
|
||||
@@ -24,7 +24,7 @@
|
||||
//! The sender is responsible for getting our vote out, see [`sender`]. The receiver handles
|
||||
//! incoming [`DisputeRequest`]s and offers spam protection, see [`receiver`].
|
||||
|
||||
use std::num::NonZeroUsize;
|
||||
use std::{num::NonZeroUsize, time::Duration};
|
||||
|
||||
use futures::{channel::mpsc, FutureExt, StreamExt, TryFutureExt};
|
||||
|
||||
@@ -66,16 +66,19 @@ use self::sender::{DisputeSender, TaskFinish};
|
||||
/// via a dedicated channel and forwarding them to the dispute coordinator via
|
||||
/// `DisputeCoordinatorMessage::ImportStatements`. Being the interface to the network and untrusted
|
||||
/// nodes, the reality is not that simple of course. Before importing statements the receiver will
|
||||
/// make sure as good as it can to filter out malicious/unwanted/spammy requests. For this it does
|
||||
/// the following:
|
||||
/// batch up imports as well as possible for efficient imports while maintaining timely dispute
|
||||
/// resolution and handling of spamming validators:
|
||||
///
|
||||
/// - Drop all messages from non validator nodes, for this it requires the [`AuthorityDiscovery`]
|
||||
/// service.
|
||||
/// - Drop messages from a node, if we are already importing a message from that node (flood).
|
||||
/// - Drop messages from nodes, that provided us messages where the statement import failed.
|
||||
/// - Drop messages from a node, if it sends at a too high rate.
|
||||
/// - Filter out duplicate messages (over some period of time).
|
||||
/// - Drop any obviously invalid votes (invalid signatures for example).
|
||||
/// - Ban peers whose votes were deemed invalid.
|
||||
///
|
||||
/// In general dispute-distribution works on limiting the work the dispute-coordinator will have to
|
||||
/// do, while at the same time making it aware of new disputes as fast as possible.
|
||||
///
|
||||
/// For successfully imported votes, we will confirm the receipt of the message back to the sender.
|
||||
/// This way a received confirmation guarantees, that the vote has been stored to disk by the
|
||||
/// receiver.
|
||||
@@ -95,6 +98,20 @@ pub use metrics::Metrics;
|
||||
|
||||
const LOG_TARGET: &'static str = "parachain::dispute-distribution";
|
||||
|
||||
/// Rate limit on the `receiver` side.
|
||||
///
|
||||
/// If messages from one peer come in at a higher rate than every `RECEIVE_RATE_LIMIT` on average, we
|
||||
/// start dropping messages from that peer to enforce that limit.
|
||||
pub const RECEIVE_RATE_LIMIT: Duration = Duration::from_millis(100);
|
||||
|
||||
/// Rate limit on the `sender` side.
|
||||
///
|
||||
/// In order to not hit the `RECEIVE_RATE_LIMIT` on the receiving side, we limit out sending rate as
|
||||
/// well.
|
||||
///
|
||||
/// We add 50ms extra, just to have some save margin to the `RECEIVE_RATE_LIMIT`.
|
||||
pub const SEND_RATE_LIMIT: Duration = RECEIVE_RATE_LIMIT.saturating_add(Duration::from_millis(50));
|
||||
|
||||
/// The dispute distribution subsystem.
|
||||
pub struct DisputeDistributionSubsystem<AD> {
|
||||
/// Easy and efficient runtime access for this subsystem.
|
||||
@@ -175,6 +192,12 @@ where
|
||||
ctx.spawn("disputes-receiver", receiver.run().boxed())
|
||||
.map_err(FatalError::SpawnTask)?;
|
||||
|
||||
// Process messages for sending side.
|
||||
//
|
||||
// Note: We want the sender to be rate limited and we are currently taking advantage of the
|
||||
// fact that the root task of this subsystem is only concerned with sending: Functions of
|
||||
// `DisputeSender` might back pressure if the rate limit is hit, which will slow down this
|
||||
// loop. If this fact ever changes, we will likely need another task.
|
||||
loop {
|
||||
let message = MuxedMessage::receive(&mut ctx, &mut self.sender_rx).await;
|
||||
match message {
|
||||
@@ -250,9 +273,10 @@ impl MuxedMessage {
|
||||
// ends.
|
||||
let from_overseer = ctx.recv().fuse();
|
||||
futures::pin_mut!(from_overseer, from_sender);
|
||||
futures::select!(
|
||||
msg = from_overseer => MuxedMessage::Subsystem(msg.map_err(FatalError::SubsystemReceive)),
|
||||
// We select biased to make sure we finish up loose ends, before starting new work.
|
||||
futures::select_biased!(
|
||||
msg = from_sender.next() => MuxedMessage::Sender(msg),
|
||||
msg = from_overseer => MuxedMessage::Subsystem(msg.map_err(FatalError::SubsystemReceive)),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -72,9 +72,12 @@ impl Metrics {
|
||||
}
|
||||
|
||||
/// Statements have been imported.
|
||||
pub fn on_imported(&self, label: &'static str) {
|
||||
pub fn on_imported(&self, label: &'static str, num_requests: usize) {
|
||||
if let Some(metrics) = &self.0 {
|
||||
metrics.imported_requests.with_label_values(&[label]).inc()
|
||||
metrics
|
||||
.imported_requests
|
||||
.with_label_values(&[label])
|
||||
.inc_by(num_requests as u64)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,209 @@
|
||||
// Copyright 2022 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Polkadot.
|
||||
|
||||
// Polkadot is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Polkadot is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use std::{collections::HashMap, time::Instant};
|
||||
|
||||
use gum::CandidateHash;
|
||||
use polkadot_node_network_protocol::{
|
||||
request_response::{incoming::OutgoingResponseSender, v1::DisputeRequest},
|
||||
PeerId,
|
||||
};
|
||||
use polkadot_node_primitives::SignedDisputeStatement;
|
||||
use polkadot_primitives::v2::{CandidateReceipt, ValidatorIndex};
|
||||
|
||||
use crate::receiver::{BATCH_COLLECTING_INTERVAL, MIN_KEEP_BATCH_ALIVE_VOTES};
|
||||
|
||||
use super::MAX_BATCH_LIFETIME;
|
||||
|
||||
/// A batch of votes to be imported into the `dispute-coordinator`.
|
||||
///
|
||||
/// Vote imports are way more efficient when performed in batches, hence we batch together incoming
|
||||
/// votes until the rate of incoming votes falls below a threshold, then we import into the dispute
|
||||
/// coordinator.
|
||||
///
|
||||
/// A `Batch` keeps track of the votes to be imported and the current incoming rate, on rate update
|
||||
/// it will "flush" in case the incoming rate dropped too low, preparing the import.
|
||||
pub struct Batch {
|
||||
/// The actual candidate this batch is concerned with.
|
||||
candidate_receipt: CandidateReceipt,
|
||||
|
||||
/// Cache of `CandidateHash` (candidate_receipt.hash()).
|
||||
candidate_hash: CandidateHash,
|
||||
|
||||
/// All valid votes received in this batch so far.
|
||||
///
|
||||
/// We differentiate between valid and invalid votes, so we can detect (and drop) duplicates,
|
||||
/// while still allowing validators to equivocate.
|
||||
///
|
||||
/// Detecting and rejecting duplicates is crucial in order to effectively enforce
|
||||
/// `MIN_KEEP_BATCH_ALIVE_VOTES` per `BATCH_COLLECTING_INTERVAL`. If we would count duplicates
|
||||
/// here, the mechanism would be broken.
|
||||
valid_votes: HashMap<ValidatorIndex, SignedDisputeStatement>,
|
||||
|
||||
/// All invalid votes received in this batch so far.
|
||||
invalid_votes: HashMap<ValidatorIndex, SignedDisputeStatement>,
|
||||
|
||||
/// How many votes have been batched since the last tick/creation.
|
||||
votes_batched_since_last_tick: u32,
|
||||
|
||||
/// Expiry time for the batch.
|
||||
///
|
||||
/// By this time the latest this batch will get flushed.
|
||||
best_before: Instant,
|
||||
|
||||
/// Requesters waiting for a response.
|
||||
requesters: Vec<(PeerId, OutgoingResponseSender<DisputeRequest>)>,
|
||||
}
|
||||
|
||||
/// Result of checking a batch every `BATCH_COLLECTING_INTERVAL`.
|
||||
pub(super) enum TickResult {
|
||||
/// Batch is still alive, please call `tick` again at the given `Instant`.
|
||||
Alive(Batch, Instant),
|
||||
/// Batch is done, ready for import!
|
||||
Done(PreparedImport),
|
||||
}
|
||||
|
||||
/// Ready for import.
|
||||
pub struct PreparedImport {
|
||||
pub candidate_receipt: CandidateReceipt,
|
||||
pub statements: Vec<(SignedDisputeStatement, ValidatorIndex)>,
|
||||
/// Information about original requesters.
|
||||
pub requesters: Vec<(PeerId, OutgoingResponseSender<DisputeRequest>)>,
|
||||
}
|
||||
|
||||
impl From<Batch> for PreparedImport {
|
||||
fn from(batch: Batch) -> Self {
|
||||
let Batch {
|
||||
candidate_receipt,
|
||||
valid_votes,
|
||||
invalid_votes,
|
||||
requesters: pending_responses,
|
||||
..
|
||||
} = batch;
|
||||
|
||||
let statements = valid_votes
|
||||
.into_iter()
|
||||
.chain(invalid_votes.into_iter())
|
||||
.map(|(index, statement)| (statement, index))
|
||||
.collect();
|
||||
|
||||
Self { candidate_receipt, statements, requesters: pending_responses }
|
||||
}
|
||||
}
|
||||
|
||||
impl Batch {
|
||||
/// Create a new empty batch based on the given `CandidateReceipt`.
|
||||
///
|
||||
/// To create a `Batch` use Batches::find_batch`.
|
||||
///
|
||||
/// Arguments:
|
||||
///
|
||||
/// * `candidate_receipt` - The candidate this batch is meant to track votes for.
|
||||
/// * `now` - current time stamp for calculating the first tick.
|
||||
///
|
||||
/// Returns: A batch and the first `Instant` you are supposed to call `tick`.
|
||||
pub(super) fn new(candidate_receipt: CandidateReceipt, now: Instant) -> (Self, Instant) {
|
||||
let s = Self {
|
||||
candidate_hash: candidate_receipt.hash(),
|
||||
candidate_receipt,
|
||||
valid_votes: HashMap::new(),
|
||||
invalid_votes: HashMap::new(),
|
||||
votes_batched_since_last_tick: 0,
|
||||
best_before: Instant::now() + MAX_BATCH_LIFETIME,
|
||||
requesters: Vec::new(),
|
||||
};
|
||||
let next_tick = s.calculate_next_tick(now);
|
||||
(s, next_tick)
|
||||
}
|
||||
|
||||
/// Receipt of the candidate this batch is batching votes for.
|
||||
pub fn candidate_receipt(&self) -> &CandidateReceipt {
|
||||
&self.candidate_receipt
|
||||
}
|
||||
|
||||
/// Add votes from a validator into the batch.
|
||||
///
|
||||
/// The statements are supposed to be the valid and invalid statements received in a
|
||||
/// `DisputeRequest`.
|
||||
///
|
||||
/// The given `pending_response` is the corresponding response sender for responding to `peer`.
|
||||
/// If at least one of the votes is new as far as this batch is concerned we record the
|
||||
/// pending_response, for later use. In case both votes are known already, we return the
|
||||
/// response sender as an `Err` value.
|
||||
pub fn add_votes(
|
||||
&mut self,
|
||||
valid_vote: (SignedDisputeStatement, ValidatorIndex),
|
||||
invalid_vote: (SignedDisputeStatement, ValidatorIndex),
|
||||
peer: PeerId,
|
||||
pending_response: OutgoingResponseSender<DisputeRequest>,
|
||||
) -> Result<(), OutgoingResponseSender<DisputeRequest>> {
|
||||
debug_assert!(valid_vote.0.candidate_hash() == invalid_vote.0.candidate_hash());
|
||||
debug_assert!(valid_vote.0.candidate_hash() == &self.candidate_hash);
|
||||
|
||||
let mut duplicate = true;
|
||||
|
||||
if self.valid_votes.insert(valid_vote.1, valid_vote.0).is_none() {
|
||||
self.votes_batched_since_last_tick += 1;
|
||||
duplicate = false;
|
||||
}
|
||||
if self.invalid_votes.insert(invalid_vote.1, invalid_vote.0).is_none() {
|
||||
self.votes_batched_since_last_tick += 1;
|
||||
duplicate = false;
|
||||
}
|
||||
|
||||
if duplicate {
|
||||
Err(pending_response)
|
||||
} else {
|
||||
self.requesters.push((peer, pending_response));
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Check batch for liveness.
|
||||
///
|
||||
/// This function is supposed to be called at instants given at construction and as returned as
|
||||
/// part of `TickResult`.
|
||||
pub(super) fn tick(mut self, now: Instant) -> TickResult {
|
||||
if self.votes_batched_since_last_tick >= MIN_KEEP_BATCH_ALIVE_VOTES &&
|
||||
now < self.best_before
|
||||
{
|
||||
// Still good:
|
||||
let next_tick = self.calculate_next_tick(now);
|
||||
// Reset counter:
|
||||
self.votes_batched_since_last_tick = 0;
|
||||
TickResult::Alive(self, next_tick)
|
||||
} else {
|
||||
TickResult::Done(PreparedImport::from(self))
|
||||
}
|
||||
}
|
||||
|
||||
/// Calculate when the next tick should happen.
|
||||
///
|
||||
/// This will usually return `now + BATCH_COLLECTING_INTERVAL`, except if the lifetime of this batch
|
||||
/// would exceed `MAX_BATCH_LIFETIME`.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `now` - The current time.
|
||||
fn calculate_next_tick(&self, now: Instant) -> Instant {
|
||||
let next_tick = now + BATCH_COLLECTING_INTERVAL;
|
||||
if next_tick < self.best_before {
|
||||
next_tick
|
||||
} else {
|
||||
self.best_before
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,170 @@
|
||||
// Copyright 2022 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Polkadot.
|
||||
|
||||
// Polkadot is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Polkadot is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use std::{
|
||||
collections::{hash_map, HashMap},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use futures::future::pending;
|
||||
|
||||
use polkadot_node_network_protocol::request_response::DISPUTE_REQUEST_TIMEOUT;
|
||||
use polkadot_primitives::v2::{CandidateHash, CandidateReceipt};
|
||||
|
||||
use crate::{
|
||||
receiver::batches::{batch::TickResult, waiting_queue::PendingWake},
|
||||
LOG_TARGET,
|
||||
};
|
||||
|
||||
pub use self::batch::{Batch, PreparedImport};
|
||||
use self::waiting_queue::WaitingQueue;
|
||||
|
||||
use super::{
|
||||
error::{JfyiError, JfyiResult},
|
||||
BATCH_COLLECTING_INTERVAL,
|
||||
};
|
||||
|
||||
/// A single batch (per candidate) as managed by `Batches`.
|
||||
mod batch;
|
||||
|
||||
/// Queue events in time and wait for them to become ready.
|
||||
mod waiting_queue;
|
||||
|
||||
/// Safe-guard in case votes trickle in real slow.
|
||||
///
|
||||
/// If the batch life time exceeded the time the sender is willing to wait for a confirmation, we
|
||||
/// would trigger pointless re-sends.
|
||||
const MAX_BATCH_LIFETIME: Duration = DISPUTE_REQUEST_TIMEOUT.saturating_sub(Duration::from_secs(2));
|
||||
|
||||
/// Limit the number of batches that can be alive at any given time.
|
||||
///
|
||||
/// Reasoning for this number, see guide.
|
||||
pub const MAX_BATCHES: usize = 1000;
|
||||
|
||||
/// Manage batches.
|
||||
///
|
||||
/// - Batches can be found via `find_batch()` in order to add votes to them/check they exist.
|
||||
/// - Batches can be checked for being ready for flushing in order to import contained votes.
|
||||
pub struct Batches {
|
||||
/// The batches we manage.
|
||||
///
|
||||
/// Kept invariants:
|
||||
/// For each entry in `batches`, there exists an entry in `waiting_queue` as well - we wait on
|
||||
/// all batches!
|
||||
batches: HashMap<CandidateHash, Batch>,
|
||||
/// Waiting queue for waiting for batches to become ready for `tick`.
|
||||
///
|
||||
/// Kept invariants by `Batches`:
|
||||
/// For each entry in the `waiting_queue` there exists a corresponding entry in `batches`.
|
||||
waiting_queue: WaitingQueue<CandidateHash>,
|
||||
}
|
||||
|
||||
/// A found batch is either really found or got created so it can be found.
|
||||
pub enum FoundBatch<'a> {
|
||||
/// Batch just got created.
|
||||
Created(&'a mut Batch),
|
||||
/// Batch already existed.
|
||||
Found(&'a mut Batch),
|
||||
}
|
||||
|
||||
impl Batches {
|
||||
/// Create new empty `Batches`.
|
||||
pub fn new() -> Self {
|
||||
debug_assert!(
|
||||
MAX_BATCH_LIFETIME > BATCH_COLLECTING_INTERVAL,
|
||||
"Unexpectedly low `MAX_BATCH_LIFETIME`, please check parameters."
|
||||
);
|
||||
Self { batches: HashMap::new(), waiting_queue: WaitingQueue::new() }
|
||||
}
|
||||
|
||||
/// Find a particular batch.
|
||||
///
|
||||
/// That is either find it, or we create it as reflected by the result `FoundBatch`.
|
||||
pub fn find_batch(
|
||||
&mut self,
|
||||
candidate_hash: CandidateHash,
|
||||
candidate_receipt: CandidateReceipt,
|
||||
) -> JfyiResult<FoundBatch> {
|
||||
if self.batches.len() >= MAX_BATCHES {
|
||||
return Err(JfyiError::MaxBatchLimitReached)
|
||||
}
|
||||
debug_assert!(candidate_hash == candidate_receipt.hash());
|
||||
let result = match self.batches.entry(candidate_hash) {
|
||||
hash_map::Entry::Vacant(vacant) => {
|
||||
let now = Instant::now();
|
||||
let (created, ready_at) = Batch::new(candidate_receipt, now);
|
||||
let pending_wake = PendingWake { payload: candidate_hash, ready_at };
|
||||
self.waiting_queue.push(pending_wake);
|
||||
FoundBatch::Created(vacant.insert(created))
|
||||
},
|
||||
hash_map::Entry::Occupied(occupied) => FoundBatch::Found(occupied.into_mut()),
|
||||
};
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Wait for the next `tick` to check for ready batches.
|
||||
///
|
||||
/// This function blocks (returns `Poll::Pending`) until at least one batch can be
|
||||
/// checked for readiness meaning that `BATCH_COLLECTING_INTERVAL` has passed since the last
|
||||
/// check for that batch or it reached end of life.
|
||||
///
|
||||
/// If this `Batches` instance is empty (does not actually contain any batches), then this
|
||||
/// function will always return `Poll::Pending`.
|
||||
///
|
||||
/// Returns: A `Vec` of all `PreparedImport`s from batches that became ready.
|
||||
pub async fn check_batches(&mut self) -> Vec<PreparedImport> {
|
||||
let now = Instant::now();
|
||||
|
||||
let mut imports = Vec::new();
|
||||
|
||||
// Wait for at least one batch to become ready:
|
||||
self.waiting_queue.wait_ready(now).await;
|
||||
|
||||
// Process all ready entries:
|
||||
while let Some(wake) = self.waiting_queue.pop_ready(now) {
|
||||
let batch = self.batches.remove(&wake.payload);
|
||||
debug_assert!(
|
||||
batch.is_some(),
|
||||
"Entries referenced in `waiting_queue` are supposed to exist!"
|
||||
);
|
||||
let batch = match batch {
|
||||
None => return pending().await,
|
||||
Some(batch) => batch,
|
||||
};
|
||||
match batch.tick(now) {
|
||||
TickResult::Done(import) => {
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
candidate_hash = ?wake.payload,
|
||||
"Batch became ready."
|
||||
);
|
||||
imports.push(import);
|
||||
},
|
||||
TickResult::Alive(old_batch, next_tick) => {
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
candidate_hash = ?wake.payload,
|
||||
"Batch found to be still alive on check."
|
||||
);
|
||||
let pending_wake = PendingWake { payload: wake.payload, ready_at: next_tick };
|
||||
self.waiting_queue.push(pending_wake);
|
||||
self.batches.insert(wake.payload, old_batch);
|
||||
},
|
||||
}
|
||||
}
|
||||
imports
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,204 @@
|
||||
// Copyright 2022 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Polkadot.
|
||||
|
||||
// Polkadot is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Polkadot is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use std::{cmp::Ordering, collections::BinaryHeap, time::Instant};
|
||||
|
||||
use futures::future::pending;
|
||||
use futures_timer::Delay;
|
||||
|
||||
/// Wait asynchronously for given `Instant`s one after the other.
|
||||
///
|
||||
/// `PendingWake`s can be inserted and `WaitingQueue` makes `wait_ready()` to always wait for the
|
||||
/// next `Instant` in the queue.
|
||||
pub struct WaitingQueue<Payload> {
|
||||
/// All pending wakes we are supposed to wait on in order.
|
||||
pending_wakes: BinaryHeap<PendingWake<Payload>>,
|
||||
/// Wait for next `PendingWake`.
|
||||
timer: Option<Delay>,
|
||||
}
|
||||
|
||||
/// Represents some event waiting to be processed at `ready_at`.
|
||||
///
|
||||
/// This is an event in `WaitingQueue`. It provides an `Ord` instance, that sorts descending with
|
||||
/// regard to `Instant` (so we get a `min-heap` with the earliest `Instant` at the top).
|
||||
#[derive(Eq, PartialEq)]
|
||||
pub struct PendingWake<Payload> {
|
||||
pub payload: Payload,
|
||||
pub ready_at: Instant,
|
||||
}
|
||||
|
||||
impl<Payload: Eq + Ord> WaitingQueue<Payload> {
|
||||
/// Get a new empty `WaitingQueue`.
|
||||
///
|
||||
/// If you call `pop` on this queue immediately, it will always return `Poll::Pending`.
|
||||
pub fn new() -> Self {
|
||||
Self { pending_wakes: BinaryHeap::new(), timer: None }
|
||||
}
|
||||
|
||||
/// Push a `PendingWake`.
|
||||
///
|
||||
/// The next call to `wait_ready` will make sure to wake soon enough to process that new event in a
|
||||
/// timely manner.
|
||||
pub fn push(&mut self, wake: PendingWake<Payload>) {
|
||||
self.pending_wakes.push(wake);
|
||||
// Reset timer as it is potentially obsolete now:
|
||||
self.timer = None;
|
||||
}
|
||||
|
||||
/// Pop the next ready item.
|
||||
///
|
||||
/// This function does not wait, if nothing is ready right now as determined by the passed
|
||||
/// `now` time stamp, this function simply returns `None`.
|
||||
pub fn pop_ready(&mut self, now: Instant) -> Option<PendingWake<Payload>> {
|
||||
let is_ready = self.pending_wakes.peek().map_or(false, |p| p.ready_at <= now);
|
||||
if is_ready {
|
||||
Some(self.pending_wakes.pop().expect("We just peeked. qed."))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Don't pop, just wait until something is ready.
|
||||
///
|
||||
/// Once this function returns `Poll::Ready(())` `pop_ready()` will return `Some`, if passed
|
||||
/// the same `Instant`.
|
||||
///
|
||||
/// Whether ready or not is determined based on the passed time stamp `now` which should be the
|
||||
/// current time as returned by `Instant::now()`
|
||||
///
|
||||
/// This function waits asynchronously for an item to become ready. If there is no more item,
|
||||
/// this call will wait forever (return Poll::Pending without scheduling a wake).
|
||||
pub async fn wait_ready(&mut self, now: Instant) {
|
||||
if let Some(timer) = &mut self.timer {
|
||||
// Previous timer was not done yet.
|
||||
timer.await
|
||||
}
|
||||
|
||||
let next_waiting = self.pending_wakes.peek();
|
||||
let is_ready = next_waiting.map_or(false, |p| p.ready_at <= now);
|
||||
if is_ready {
|
||||
return
|
||||
}
|
||||
|
||||
self.timer = next_waiting.map(|p| Delay::new(p.ready_at.duration_since(now)));
|
||||
match &mut self.timer {
|
||||
None => return pending().await,
|
||||
Some(timer) => timer.await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Payload: Eq + Ord> PartialOrd<PendingWake<Payload>> for PendingWake<Payload> {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
|
||||
impl<Payload: Ord> Ord for PendingWake<Payload> {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
// Reverse order for min-heap:
|
||||
match other.ready_at.cmp(&self.ready_at) {
|
||||
Ordering::Equal => other.payload.cmp(&self.payload),
|
||||
o => o,
|
||||
}
|
||||
}
|
||||
}
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{
|
||||
task::Poll,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use assert_matches::assert_matches;
|
||||
use futures::{future::poll_fn, pin_mut, Future};
|
||||
|
||||
use crate::LOG_TARGET;
|
||||
|
||||
use super::{PendingWake, WaitingQueue};
|
||||
|
||||
#[test]
|
||||
fn wait_ready_waits_for_earliest_event_always() {
|
||||
sp_tracing::try_init_simple();
|
||||
let mut queue = WaitingQueue::new();
|
||||
let now = Instant::now();
|
||||
let start = now;
|
||||
queue.push(PendingWake { payload: 1u32, ready_at: now + Duration::from_millis(3) });
|
||||
// Push another one in order:
|
||||
queue.push(PendingWake { payload: 2u32, ready_at: now + Duration::from_millis(5) });
|
||||
// Push one out of order:
|
||||
queue.push(PendingWake { payload: 0u32, ready_at: now + Duration::from_millis(1) });
|
||||
// Push another one at same timestamp (should become ready at the same time)
|
||||
queue.push(PendingWake { payload: 10u32, ready_at: now + Duration::from_millis(1) });
|
||||
|
||||
futures::executor::block_on(async move {
|
||||
// No time passed yet - nothing should be ready.
|
||||
assert!(queue.pop_ready(now).is_none(), "No time has passed, nothing should be ready");
|
||||
|
||||
// Receive them in order at expected times:
|
||||
queue.wait_ready(now).await;
|
||||
gum::trace!(target: LOG_TARGET, "After first wait.");
|
||||
|
||||
let now = start + Duration::from_millis(1);
|
||||
assert!(Instant::now() - start >= Duration::from_millis(1));
|
||||
assert_eq!(queue.pop_ready(now).map(|p| p.payload), Some(0u32));
|
||||
// One more should be ready:
|
||||
assert_eq!(queue.pop_ready(now).map(|p| p.payload), Some(10u32));
|
||||
assert!(queue.pop_ready(now).is_none(), "No more entry expected to be ready.");
|
||||
|
||||
queue.wait_ready(now).await;
|
||||
gum::trace!(target: LOG_TARGET, "After second wait.");
|
||||
let now = start + Duration::from_millis(3);
|
||||
assert!(Instant::now() - start >= Duration::from_millis(3));
|
||||
assert_eq!(queue.pop_ready(now).map(|p| p.payload), Some(1u32));
|
||||
assert!(queue.pop_ready(now).is_none(), "No more entry expected to be ready.");
|
||||
|
||||
// Push in between wait:
|
||||
poll_fn(|cx| {
|
||||
let fut = queue.wait_ready(now);
|
||||
pin_mut!(fut);
|
||||
assert_matches!(fut.poll(cx), Poll::Pending);
|
||||
Poll::Ready(())
|
||||
})
|
||||
.await;
|
||||
queue.push(PendingWake { payload: 3u32, ready_at: start + Duration::from_millis(4) });
|
||||
|
||||
queue.wait_ready(now).await;
|
||||
// Newly pushed element should have become ready:
|
||||
gum::trace!(target: LOG_TARGET, "After third wait.");
|
||||
let now = start + Duration::from_millis(4);
|
||||
assert!(Instant::now() - start >= Duration::from_millis(4));
|
||||
assert_eq!(queue.pop_ready(now).map(|p| p.payload), Some(3u32));
|
||||
assert!(queue.pop_ready(now).is_none(), "No more entry expected to be ready.");
|
||||
|
||||
queue.wait_ready(now).await;
|
||||
gum::trace!(target: LOG_TARGET, "After fourth wait.");
|
||||
let now = start + Duration::from_millis(5);
|
||||
assert!(Instant::now() - start >= Duration::from_millis(5));
|
||||
assert_eq!(queue.pop_ready(now).map(|p| p.payload), Some(2u32));
|
||||
assert!(queue.pop_ready(now).is_none(), "No more entry expected to be ready.");
|
||||
|
||||
// queue empty - should wait forever now:
|
||||
poll_fn(|cx| {
|
||||
let fut = queue.wait_ready(now);
|
||||
pin_mut!(fut);
|
||||
assert_matches!(fut.poll(cx), Poll::Pending);
|
||||
Poll::Ready(())
|
||||
})
|
||||
.await;
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -19,8 +19,10 @@
|
||||
|
||||
use fatality::Nested;
|
||||
|
||||
use gum::CandidateHash;
|
||||
use polkadot_node_network_protocol::{request_response::incoming, PeerId};
|
||||
use polkadot_node_subsystem_util::runtime;
|
||||
use polkadot_primitives::v2::AuthorityDiscoveryId;
|
||||
|
||||
use crate::LOG_TARGET;
|
||||
|
||||
@@ -35,8 +37,8 @@ pub enum Error {
|
||||
#[error("Retrieving next incoming request failed.")]
|
||||
IncomingRequest(#[from] incoming::Error),
|
||||
|
||||
#[error("Sending back response to peer {0} failed.")]
|
||||
SendResponse(PeerId),
|
||||
#[error("Sending back response to peers {0:#?} failed.")]
|
||||
SendResponses(Vec<PeerId>),
|
||||
|
||||
#[error("Changing peer's ({0}) reputation failed.")]
|
||||
SetPeerReputation(PeerId),
|
||||
@@ -44,16 +46,29 @@ pub enum Error {
|
||||
#[error("Dispute request with invalid signatures, from peer {0}.")]
|
||||
InvalidSignature(PeerId),
|
||||
|
||||
#[error("Import of dispute got canceled for peer {0} - import failed for some reason.")]
|
||||
ImportCanceled(PeerId),
|
||||
#[error("Received votes from peer {0} have been completely redundant.")]
|
||||
RedundantMessage(PeerId),
|
||||
|
||||
#[error("Import of dispute got canceled for candidate {0} - import failed for some reason.")]
|
||||
ImportCanceled(CandidateHash),
|
||||
|
||||
#[error("Peer {0} attempted to participate in dispute and is not a validator.")]
|
||||
NotAValidator(PeerId),
|
||||
|
||||
#[error("Force flush for batch that could not be found attempted, candidate hash: {0}")]
|
||||
ForceFlushBatchDoesNotExist(CandidateHash),
|
||||
|
||||
// Should never happen in practice:
|
||||
#[error("We needed to drop messages, because we reached limit on concurrent batches.")]
|
||||
MaxBatchLimitReached,
|
||||
|
||||
#[error("Authority {0} sent messages at a too high rate.")]
|
||||
AuthorityFlooding(AuthorityDiscoveryId),
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
pub type JfyiErrorResult<T> = std::result::Result<T, JfyiError>;
|
||||
pub type JfyiResult<T> = std::result::Result<T, JfyiError>;
|
||||
|
||||
/// Utility for eating top level errors and log them.
|
||||
///
|
||||
|
||||
@@ -15,21 +15,21 @@
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use std::{
|
||||
collections::HashSet,
|
||||
num::NonZeroUsize,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use futures::{
|
||||
channel::oneshot,
|
||||
future::{poll_fn, BoxFuture},
|
||||
future::poll_fn,
|
||||
pin_mut,
|
||||
stream::{FusedStream, FuturesUnordered, StreamExt},
|
||||
Future, FutureExt, Stream,
|
||||
stream::{FuturesUnordered, StreamExt},
|
||||
Future,
|
||||
};
|
||||
use lru::LruCache;
|
||||
|
||||
use gum::CandidateHash;
|
||||
use polkadot_node_network_protocol::{
|
||||
authority_discovery::AuthorityDiscovery,
|
||||
request_response::{
|
||||
@@ -52,25 +52,47 @@ use crate::{
|
||||
};
|
||||
|
||||
mod error;
|
||||
use self::error::{log_error, JfyiError, JfyiErrorResult, Result};
|
||||
|
||||
/// Rate limiting queues for incoming requests by peers.
|
||||
mod peer_queues;
|
||||
|
||||
/// Batch imports together.
|
||||
mod batches;
|
||||
|
||||
use self::{
|
||||
batches::{Batches, FoundBatch, PreparedImport},
|
||||
error::{log_error, JfyiError, JfyiResult, Result},
|
||||
peer_queues::PeerQueues,
|
||||
};
|
||||
|
||||
const COST_INVALID_REQUEST: Rep = Rep::CostMajor("Received message could not be decoded.");
|
||||
const COST_INVALID_SIGNATURE: Rep = Rep::Malicious("Signatures were invalid.");
|
||||
const COST_INVALID_CANDIDATE: Rep = Rep::Malicious("Reported candidate was not available.");
|
||||
const COST_INVALID_IMPORT: Rep =
|
||||
Rep::Malicious("Import was deemed invalid by dispute-coordinator.");
|
||||
const COST_NOT_A_VALIDATOR: Rep = Rep::CostMajor("Reporting peer was not a validator.");
|
||||
/// Mildly punish peers exceeding their rate limit.
|
||||
///
|
||||
/// For honest peers this should rarely happen, but if it happens we would not want to disconnect
|
||||
/// too quickly. Minor cost should suffice for disconnecting any real flooder.
|
||||
const COST_APPARENT_FLOOD: Rep = Rep::CostMinor("Peer exceeded the rate limit.");
|
||||
|
||||
/// How many statement imports we want to issue in parallel:
|
||||
pub const MAX_PARALLEL_IMPORTS: usize = 10;
|
||||
/// How many votes must have arrived in the last `BATCH_COLLECTING_INTERVAL`
|
||||
///
|
||||
/// in order for a batch to stay alive and not get flushed/imported to the dispute-coordinator.
|
||||
///
|
||||
/// This ensures a timely import of batches.
|
||||
#[cfg(not(test))]
|
||||
pub const MIN_KEEP_BATCH_ALIVE_VOTES: u32 = 10;
|
||||
#[cfg(test)]
|
||||
pub const MIN_KEEP_BATCH_ALIVE_VOTES: u32 = 2;
|
||||
|
||||
const BANNED_PEERS_CACHE_SIZE: NonZeroUsize = match NonZeroUsize::new(MAX_PARALLEL_IMPORTS) {
|
||||
Some(cap) => cap,
|
||||
None => panic!("Banned peers cache size should not be 0."),
|
||||
};
|
||||
/// Time we allow to pass for new votes to trickle in.
|
||||
///
|
||||
/// See `MIN_KEEP_BATCH_ALIVE_VOTES` above.
|
||||
/// Should be greater or equal to `RECEIVE_RATE_LIMIT` (there is no point in checking any faster).
|
||||
pub const BATCH_COLLECTING_INTERVAL: Duration = Duration::from_millis(500);
|
||||
|
||||
/// State for handling incoming `DisputeRequest` messages.
|
||||
///
|
||||
/// This is supposed to run as its own task in order to easily impose back pressure on the incoming
|
||||
/// request channel and at the same time to drop flood messages as fast as possible.
|
||||
pub struct DisputesReceiver<Sender, AD> {
|
||||
/// Access to session information.
|
||||
runtime: RuntimeInfo,
|
||||
@@ -81,18 +103,17 @@ pub struct DisputesReceiver<Sender, AD> {
|
||||
/// Channel to retrieve incoming requests from.
|
||||
receiver: IncomingRequestReceiver<DisputeRequest>,
|
||||
|
||||
/// Rate limiting queue for each peer (only authorities).
|
||||
peer_queues: PeerQueues,
|
||||
|
||||
/// Currently active batches of imports per candidate.
|
||||
batches: Batches,
|
||||
|
||||
/// Authority discovery service:
|
||||
authority_discovery: AD,
|
||||
|
||||
/// Imports currently being processed.
|
||||
pending_imports: PendingImports,
|
||||
|
||||
/// We keep record of the last banned peers.
|
||||
///
|
||||
/// This is needed because once we ban a peer, we will very likely still have pending requests
|
||||
/// in the incoming channel - we should not waste time recovering availability for those, as we
|
||||
/// already know the peer is malicious.
|
||||
banned_peers: LruCache<PeerId, ()>,
|
||||
/// Imports currently being processed by the `dispute-coordinator`.
|
||||
pending_imports: FuturesUnordered<PendingImport>,
|
||||
|
||||
/// Log received requests.
|
||||
metrics: Metrics,
|
||||
@@ -106,36 +127,24 @@ enum MuxedMessage {
|
||||
///
|
||||
/// - We need to make sure responses are actually sent (therefore we need to await futures
|
||||
/// promptly).
|
||||
/// - We need to update `banned_peers` accordingly to the result.
|
||||
ConfirmedImport(JfyiErrorResult<(PeerId, ImportStatementsResult)>),
|
||||
/// - We need to punish peers whose import got rejected.
|
||||
ConfirmedImport(ImportResult),
|
||||
|
||||
/// A new request has arrived and should be handled.
|
||||
NewRequest(IncomingRequest<DisputeRequest>),
|
||||
}
|
||||
|
||||
impl MuxedMessage {
|
||||
async fn receive(
|
||||
pending_imports: &mut PendingImports,
|
||||
pending_requests: &mut IncomingRequestReceiver<DisputeRequest>,
|
||||
) -> Result<MuxedMessage> {
|
||||
poll_fn(|ctx| {
|
||||
let next_req = pending_requests.recv(|| vec![COST_INVALID_REQUEST]);
|
||||
pin_mut!(next_req);
|
||||
if let Poll::Ready(r) = next_req.poll(ctx) {
|
||||
return match r {
|
||||
Err(e) => Poll::Ready(Err(incoming::Error::from(e).into())),
|
||||
Ok(v) => Poll::Ready(Ok(Self::NewRequest(v))),
|
||||
}
|
||||
}
|
||||
// In case of Ready(None) return `Pending` below - we want to wait for the next request
|
||||
// in that case.
|
||||
if let Poll::Ready(Some(v)) = pending_imports.poll_next_unpin(ctx) {
|
||||
return Poll::Ready(Ok(Self::ConfirmedImport(v)))
|
||||
}
|
||||
Poll::Pending
|
||||
})
|
||||
.await
|
||||
}
|
||||
/// Rate limit timer hit - is is time to process one row of messages.
|
||||
///
|
||||
/// This is the result of calling `self.peer_queues.pop_reqs()`.
|
||||
WakePeerQueuesPopReqs(Vec<IncomingRequest<DisputeRequest>>),
|
||||
|
||||
/// It is time to check batches.
|
||||
///
|
||||
/// Every `BATCH_COLLECTING_INTERVAL` we check whether less than `MIN_KEEP_BATCH_ALIVE_VOTES`
|
||||
/// new votes arrived, if so the batch is ready for import.
|
||||
///
|
||||
/// This is the result of calling `self.batches.check_batches()`.
|
||||
WakeCheckBatches(Vec<PreparedImport>),
|
||||
}
|
||||
|
||||
impl<Sender, AD> DisputesReceiver<Sender, AD>
|
||||
@@ -159,11 +168,10 @@ where
|
||||
runtime,
|
||||
sender,
|
||||
receiver,
|
||||
peer_queues: PeerQueues::new(),
|
||||
batches: Batches::new(),
|
||||
authority_discovery,
|
||||
pending_imports: PendingImports::new(),
|
||||
// Size of MAX_PARALLEL_IMPORTS ensures we are going to immediately get rid of any
|
||||
// malicious requests still pending in the incoming queue.
|
||||
banned_peers: LruCache::new(BANNED_PEERS_CACHE_SIZE),
|
||||
pending_imports: FuturesUnordered::new(),
|
||||
metrics,
|
||||
}
|
||||
}
|
||||
@@ -187,60 +195,132 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Actual work happening here.
|
||||
/// Actual work happening here in three phases:
|
||||
///
|
||||
/// 1. Receive and queue incoming messages until the rate limit timer hits.
|
||||
/// 2. Do import/batching for the head of all queues.
|
||||
/// 3. Check and flush any ready batches.
|
||||
async fn run_inner(&mut self) -> Result<()> {
|
||||
let msg = MuxedMessage::receive(&mut self.pending_imports, &mut self.receiver).await?;
|
||||
let msg = self.receive_message().await?;
|
||||
|
||||
let incoming = match msg {
|
||||
// We need to clean up futures, to make sure responses are sent:
|
||||
MuxedMessage::ConfirmedImport(m_bad) => {
|
||||
self.ban_bad_peer(m_bad)?;
|
||||
return Ok(())
|
||||
match msg {
|
||||
MuxedMessage::NewRequest(req) => {
|
||||
// Phase 1:
|
||||
self.metrics.on_received_request();
|
||||
self.dispatch_to_queues(req).await?;
|
||||
},
|
||||
MuxedMessage::NewRequest(req) => req,
|
||||
};
|
||||
MuxedMessage::WakePeerQueuesPopReqs(reqs) => {
|
||||
// Phase 2:
|
||||
for req in reqs {
|
||||
// No early return - we cannot cancel imports of one peer, because the import of
|
||||
// another failed:
|
||||
match log_error(self.start_import_or_batch(req).await) {
|
||||
Ok(()) => {},
|
||||
Err(fatal) => return Err(fatal.into()),
|
||||
}
|
||||
}
|
||||
},
|
||||
MuxedMessage::WakeCheckBatches(ready_imports) => {
|
||||
// Phase 3:
|
||||
self.import_ready_batches(ready_imports).await;
|
||||
},
|
||||
MuxedMessage::ConfirmedImport(import_result) => {
|
||||
self.update_imported_requests_metrics(&import_result);
|
||||
// Confirm imports to requesters/punish them on invalid imports:
|
||||
send_responses_to_requesters(import_result).await?;
|
||||
},
|
||||
}
|
||||
|
||||
self.metrics.on_received_request();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
let peer = incoming.peer;
|
||||
/// Receive one `MuxedMessage`.
|
||||
///
|
||||
///
|
||||
/// Dispatching events to messages as they happen.
|
||||
async fn receive_message(&mut self) -> Result<MuxedMessage> {
|
||||
poll_fn(|ctx| {
|
||||
// In case of Ready(None), we want to wait for pending requests:
|
||||
if let Poll::Ready(Some(v)) = self.pending_imports.poll_next_unpin(ctx) {
|
||||
return Poll::Ready(Ok(MuxedMessage::ConfirmedImport(v?)))
|
||||
}
|
||||
|
||||
// Only accept messages from validators:
|
||||
if self.authority_discovery.get_authority_ids_by_peer_id(peer).await.is_none() {
|
||||
incoming
|
||||
.send_outgoing_response(OutgoingResponse {
|
||||
let rate_limited = self.peer_queues.pop_reqs();
|
||||
pin_mut!(rate_limited);
|
||||
// We poll rate_limit before batches, so we don't unnecessarily delay importing to
|
||||
// batches.
|
||||
if let Poll::Ready(reqs) = rate_limited.poll(ctx) {
|
||||
return Poll::Ready(Ok(MuxedMessage::WakePeerQueuesPopReqs(reqs)))
|
||||
}
|
||||
|
||||
let ready_batches = self.batches.check_batches();
|
||||
pin_mut!(ready_batches);
|
||||
if let Poll::Ready(ready_batches) = ready_batches.poll(ctx) {
|
||||
return Poll::Ready(Ok(MuxedMessage::WakeCheckBatches(ready_batches)))
|
||||
}
|
||||
|
||||
let next_req = self.receiver.recv(|| vec![COST_INVALID_REQUEST]);
|
||||
pin_mut!(next_req);
|
||||
if let Poll::Ready(r) = next_req.poll(ctx) {
|
||||
return match r {
|
||||
Err(e) => Poll::Ready(Err(incoming::Error::from(e).into())),
|
||||
Ok(v) => Poll::Ready(Ok(MuxedMessage::NewRequest(v))),
|
||||
}
|
||||
}
|
||||
Poll::Pending
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Process incoming requests.
|
||||
///
|
||||
/// - Check sender is authority
|
||||
/// - Dispatch message to corresponding queue in `peer_queues`.
|
||||
/// - If queue is full, drop message and change reputation of sender.
|
||||
async fn dispatch_to_queues(&mut self, req: IncomingRequest<DisputeRequest>) -> JfyiResult<()> {
|
||||
let peer = req.peer;
|
||||
// Only accept messages from validators, in case there are multiple `AuthorityId`s, we
|
||||
// just take the first one. On session boundaries this might allow validators to double
|
||||
// their rate limit for a short period of time, which seems acceptable.
|
||||
let authority_id = match self
|
||||
.authority_discovery
|
||||
.get_authority_ids_by_peer_id(peer)
|
||||
.await
|
||||
.and_then(|s| s.into_iter().next())
|
||||
{
|
||||
None => {
|
||||
req.send_outgoing_response(OutgoingResponse {
|
||||
result: Err(()),
|
||||
reputation_changes: vec![COST_NOT_A_VALIDATOR],
|
||||
sent_feedback: None,
|
||||
})
|
||||
.map_err(|_| JfyiError::SendResponse(peer))?;
|
||||
.map_err(|_| JfyiError::SendResponses(vec![peer]))?;
|
||||
return Err(JfyiError::NotAValidator(peer).into())
|
||||
},
|
||||
Some(auth_id) => auth_id,
|
||||
};
|
||||
|
||||
return Err(JfyiError::NotAValidator(peer).into())
|
||||
// Queue request:
|
||||
if let Err((authority_id, req)) = self.peer_queues.push_req(authority_id, req) {
|
||||
req.send_outgoing_response(OutgoingResponse {
|
||||
result: Err(()),
|
||||
reputation_changes: vec![COST_APPARENT_FLOOD],
|
||||
sent_feedback: None,
|
||||
})
|
||||
.map_err(|_| JfyiError::SendResponses(vec![peer]))?;
|
||||
return Err(JfyiError::AuthorityFlooding(authority_id))
|
||||
}
|
||||
|
||||
// Immediately drop requests from peers that already have requests in flight or have
|
||||
// been banned recently (flood protection):
|
||||
if self.pending_imports.peer_is_pending(&peer) || self.banned_peers.contains(&peer) {
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
?peer,
|
||||
"Dropping message from peer (banned/pending import)"
|
||||
);
|
||||
return Ok(())
|
||||
}
|
||||
|
||||
// Wait for a free slot:
|
||||
if self.pending_imports.len() >= MAX_PARALLEL_IMPORTS {
|
||||
// Wait for one to finish:
|
||||
let r = self.pending_imports.next().await;
|
||||
self.ban_bad_peer(r.expect("pending_imports.len() is greater 0. qed."))?;
|
||||
}
|
||||
|
||||
// All good - initiate import.
|
||||
self.start_import(incoming).await
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Start importing votes for the given request.
|
||||
async fn start_import(&mut self, incoming: IncomingRequest<DisputeRequest>) -> Result<()> {
|
||||
/// Start importing votes for the given request or batch.
|
||||
///
|
||||
/// Signature check and in case we already have an existing batch we import to that batch,
|
||||
/// otherwise import to `dispute-coordinator` directly and open a batch.
|
||||
async fn start_import_or_batch(
|
||||
&mut self,
|
||||
incoming: IncomingRequest<DisputeRequest>,
|
||||
) -> Result<()> {
|
||||
let IncomingRequest { peer, payload, pending_response } = incoming;
|
||||
|
||||
let info = self
|
||||
@@ -270,128 +350,172 @@ where
|
||||
Ok(votes) => votes,
|
||||
};
|
||||
|
||||
let candidate_hash = *valid_vote.0.candidate_hash();
|
||||
|
||||
match self.batches.find_batch(candidate_hash, candidate_receipt)? {
|
||||
FoundBatch::Created(batch) => {
|
||||
// There was no entry yet - start import immediately:
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
?candidate_hash,
|
||||
?peer,
|
||||
"No batch yet - triggering immediate import"
|
||||
);
|
||||
let import = PreparedImport {
|
||||
candidate_receipt: batch.candidate_receipt().clone(),
|
||||
statements: vec![valid_vote, invalid_vote],
|
||||
requesters: vec![(peer, pending_response)],
|
||||
};
|
||||
self.start_import(import).await;
|
||||
},
|
||||
FoundBatch::Found(batch) => {
|
||||
gum::trace!(target: LOG_TARGET, ?candidate_hash, "Batch exists - batching request");
|
||||
let batch_result =
|
||||
batch.add_votes(valid_vote, invalid_vote, peer, pending_response);
|
||||
|
||||
if let Err(pending_response) = batch_result {
|
||||
// We don't expect honest peers to send redundant votes within a single batch,
|
||||
// as the timeout for retry is much higher. Still we don't want to punish the
|
||||
// node as it might not be the node's fault. Some other (malicious) node could have been
|
||||
// faster sending the same votes in order to harm the reputation of that honest
|
||||
// node. Given that we already have a rate limit, if a validator chooses to
|
||||
// waste available rate with redundant votes - so be it. The actual dispute
|
||||
// resolution is unaffected.
|
||||
gum::debug!(
|
||||
target: LOG_TARGET,
|
||||
?peer,
|
||||
"Peer sent completely redundant votes within a single batch - that looks fishy!",
|
||||
);
|
||||
pending_response
|
||||
.send_outgoing_response(OutgoingResponse {
|
||||
// While we have seen duplicate votes, we cannot confirm as we don't
|
||||
// know yet whether the batch is going to be confirmed, so we assume
|
||||
// the worst. We don't want to push the pending response to the batch
|
||||
// either as that would be unbounded, only limited by the rate limit.
|
||||
result: Err(()),
|
||||
reputation_changes: Vec::new(),
|
||||
sent_feedback: None,
|
||||
})
|
||||
.map_err(|_| JfyiError::SendResponses(vec![peer]))?;
|
||||
return Err(From::from(JfyiError::RedundantMessage(peer)))
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Trigger import into the dispute-coordinator of ready batches (`PreparedImport`s).
|
||||
async fn import_ready_batches(&mut self, ready_imports: Vec<PreparedImport>) {
|
||||
for import in ready_imports {
|
||||
self.start_import(import).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Start import and add response receiver to `pending_imports`.
|
||||
async fn start_import(&mut self, import: PreparedImport) {
|
||||
let PreparedImport { candidate_receipt, statements, requesters } = import;
|
||||
let (session_index, candidate_hash) = match statements.iter().next() {
|
||||
None => {
|
||||
gum::debug!(
|
||||
target: LOG_TARGET,
|
||||
candidate_hash = ?candidate_receipt.hash(),
|
||||
"Not importing empty batch"
|
||||
);
|
||||
return
|
||||
},
|
||||
Some(vote) => (vote.0.session_index(), vote.0.candidate_hash().clone()),
|
||||
};
|
||||
|
||||
let (pending_confirmation, confirmation_rx) = oneshot::channel();
|
||||
self.sender
|
||||
.send_message(DisputeCoordinatorMessage::ImportStatements {
|
||||
candidate_receipt,
|
||||
session: valid_vote.0.session_index(),
|
||||
statements: vec![valid_vote, invalid_vote],
|
||||
session: session_index,
|
||||
statements,
|
||||
pending_confirmation: Some(pending_confirmation),
|
||||
})
|
||||
.await;
|
||||
|
||||
self.pending_imports.push(peer, confirmation_rx, pending_response);
|
||||
Ok(())
|
||||
let pending =
|
||||
PendingImport { candidate_hash, requesters, pending_response: confirmation_rx };
|
||||
|
||||
self.pending_imports.push(pending);
|
||||
}
|
||||
|
||||
/// Await an import and ban any misbehaving peers.
|
||||
///
|
||||
/// In addition we report import metrics.
|
||||
fn ban_bad_peer(
|
||||
&mut self,
|
||||
result: JfyiErrorResult<(PeerId, ImportStatementsResult)>,
|
||||
) -> JfyiErrorResult<()> {
|
||||
match result? {
|
||||
(_, ImportStatementsResult::ValidImport) => {
|
||||
self.metrics.on_imported(SUCCEEDED);
|
||||
},
|
||||
(bad_peer, ImportStatementsResult::InvalidImport) => {
|
||||
self.metrics.on_imported(FAILED);
|
||||
self.banned_peers.put(bad_peer, ());
|
||||
},
|
||||
}
|
||||
Ok(())
|
||||
fn update_imported_requests_metrics(&self, result: &ImportResult) {
|
||||
let label = match result.result {
|
||||
ImportStatementsResult::ValidImport => SUCCEEDED,
|
||||
ImportStatementsResult::InvalidImport => FAILED,
|
||||
};
|
||||
self.metrics.on_imported(label, result.requesters.len());
|
||||
}
|
||||
}
|
||||
|
||||
/// Manage pending imports in a way that preserves invariants.
|
||||
struct PendingImports {
|
||||
/// Futures in flight.
|
||||
futures:
|
||||
FuturesUnordered<BoxFuture<'static, (PeerId, JfyiErrorResult<ImportStatementsResult>)>>,
|
||||
/// Peers whose requests are currently in flight.
|
||||
peers: HashSet<PeerId>,
|
||||
}
|
||||
async fn send_responses_to_requesters(import_result: ImportResult) -> JfyiResult<()> {
|
||||
let ImportResult { requesters, result } = import_result;
|
||||
|
||||
impl PendingImports {
|
||||
pub fn new() -> Self {
|
||||
Self { futures: FuturesUnordered::new(), peers: HashSet::new() }
|
||||
}
|
||||
|
||||
pub fn push(
|
||||
&mut self,
|
||||
peer: PeerId,
|
||||
handled: oneshot::Receiver<ImportStatementsResult>,
|
||||
pending_response: OutgoingResponseSender<DisputeRequest>,
|
||||
) {
|
||||
self.peers.insert(peer);
|
||||
self.futures.push(
|
||||
async move {
|
||||
let r = respond_to_request(peer, handled, pending_response).await;
|
||||
(peer, r)
|
||||
}
|
||||
.boxed(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Returns the number of contained futures.
|
||||
pub fn len(&self) -> usize {
|
||||
self.futures.len()
|
||||
}
|
||||
|
||||
/// Check whether a peer has a pending import.
|
||||
pub fn peer_is_pending(&self, peer: &PeerId) -> bool {
|
||||
self.peers.contains(peer)
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for PendingImports {
|
||||
type Item = JfyiErrorResult<(PeerId, ImportStatementsResult)>;
|
||||
fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
match Pin::new(&mut self.futures).poll_next(ctx) {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(None) => Poll::Ready(None),
|
||||
Poll::Ready(Some((peer, result))) => {
|
||||
self.peers.remove(&peer);
|
||||
Poll::Ready(Some(result.map(|r| (peer, r))))
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
impl FusedStream for PendingImports {
|
||||
fn is_terminated(&self) -> bool {
|
||||
self.futures.is_terminated()
|
||||
}
|
||||
}
|
||||
|
||||
// Future for `PendingImports`
|
||||
//
|
||||
// - Wait for import
|
||||
// - Punish peer
|
||||
// - Deliver result
|
||||
async fn respond_to_request(
|
||||
peer: PeerId,
|
||||
handled: oneshot::Receiver<ImportStatementsResult>,
|
||||
pending_response: OutgoingResponseSender<DisputeRequest>,
|
||||
) -> JfyiErrorResult<ImportStatementsResult> {
|
||||
let result = handled.await.map_err(|_| JfyiError::ImportCanceled(peer))?;
|
||||
|
||||
let response = match result {
|
||||
ImportStatementsResult::ValidImport => OutgoingResponse {
|
||||
let mk_response = match result {
|
||||
ImportStatementsResult::ValidImport => || OutgoingResponse {
|
||||
result: Ok(DisputeResponse::Confirmed),
|
||||
reputation_changes: Vec::new(),
|
||||
sent_feedback: None,
|
||||
},
|
||||
ImportStatementsResult::InvalidImport => OutgoingResponse {
|
||||
ImportStatementsResult::InvalidImport => || OutgoingResponse {
|
||||
result: Err(()),
|
||||
reputation_changes: vec![COST_INVALID_CANDIDATE],
|
||||
reputation_changes: vec![COST_INVALID_IMPORT],
|
||||
sent_feedback: None,
|
||||
},
|
||||
};
|
||||
|
||||
pending_response
|
||||
.send_outgoing_response(response)
|
||||
.map_err(|_| JfyiError::SendResponse(peer))?;
|
||||
let mut sending_failed_for = Vec::new();
|
||||
for (peer, pending_response) in requesters {
|
||||
if let Err(()) = pending_response.send_outgoing_response(mk_response()) {
|
||||
sending_failed_for.push(peer);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
if !sending_failed_for.is_empty() {
|
||||
Err(JfyiError::SendResponses(sending_failed_for))
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// A future that resolves into an `ImportResult` when ready.
|
||||
///
|
||||
/// This future is used on `dispute-coordinator` import messages for the oneshot response receiver
|
||||
/// to:
|
||||
/// - Keep track of concerned `CandidateHash` for reporting errors.
|
||||
/// - Keep track of requesting peers so we can confirm the import/punish them on invalid imports.
|
||||
struct PendingImport {
|
||||
candidate_hash: CandidateHash,
|
||||
requesters: Vec<(PeerId, OutgoingResponseSender<DisputeRequest>)>,
|
||||
pending_response: oneshot::Receiver<ImportStatementsResult>,
|
||||
}
|
||||
|
||||
/// A `PendingImport` becomes an `ImportResult` once done.
|
||||
struct ImportResult {
|
||||
/// Requesters of that import.
|
||||
requesters: Vec<(PeerId, OutgoingResponseSender<DisputeRequest>)>,
|
||||
/// Actual result of the import.
|
||||
result: ImportStatementsResult,
|
||||
}
|
||||
|
||||
impl PendingImport {
|
||||
async fn wait_for_result(&mut self) -> JfyiResult<ImportResult> {
|
||||
let result = (&mut self.pending_response)
|
||||
.await
|
||||
.map_err(|_| JfyiError::ImportCanceled(self.candidate_hash))?;
|
||||
Ok(ImportResult { requesters: std::mem::take(&mut self.requesters), result })
|
||||
}
|
||||
}
|
||||
|
||||
impl Future for PendingImport {
|
||||
type Output = JfyiResult<ImportResult>;
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let fut = self.wait_for_result();
|
||||
pin_mut!(fut);
|
||||
fut.poll(cx)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,141 @@
|
||||
// Copyright 2022 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Polkadot.
|
||||
|
||||
// Polkadot is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Polkadot is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use std::collections::{hash_map::Entry, HashMap, VecDeque};
|
||||
|
||||
use futures::future::pending;
|
||||
use futures_timer::Delay;
|
||||
use polkadot_node_network_protocol::request_response::{v1::DisputeRequest, IncomingRequest};
|
||||
use polkadot_primitives::v2::AuthorityDiscoveryId;
|
||||
|
||||
use crate::RECEIVE_RATE_LIMIT;
|
||||
|
||||
/// How many messages we are willing to queue per peer (validator).
|
||||
///
|
||||
/// The larger this value is, the larger bursts are allowed to be without us dropping messages. On
|
||||
/// the flip side this gets allocated per validator, so for a size of 10 this will result
|
||||
/// in `10_000 * size_of(IncomingRequest)` in the worst case.
|
||||
///
|
||||
/// `PEER_QUEUE_CAPACITY` must not be 0 for obvious reasons.
|
||||
#[cfg(not(test))]
|
||||
pub const PEER_QUEUE_CAPACITY: usize = 10;
|
||||
#[cfg(test)]
|
||||
pub const PEER_QUEUE_CAPACITY: usize = 2;
|
||||
|
||||
/// Queues for messages from authority peers for rate limiting.
|
||||
///
|
||||
/// Invariants ensured:
|
||||
///
|
||||
/// 1. No queue will ever have more than `PEER_QUEUE_CAPACITY` elements.
|
||||
/// 2. There are no empty queues. Whenever a queue gets empty, it is removed. This way checking
|
||||
/// whether there are any messages queued is cheap.
|
||||
/// 3. As long as not empty, `pop_reqs` will, if called in sequence, not return `Ready` more often
|
||||
/// than once for every `RECEIVE_RATE_LIMIT`, but it will always return Ready eventually.
|
||||
/// 4. If empty `pop_reqs` will never return `Ready`, but will always be `Pending`.
|
||||
pub struct PeerQueues {
|
||||
/// Actual queues.
|
||||
queues: HashMap<AuthorityDiscoveryId, VecDeque<IncomingRequest<DisputeRequest>>>,
|
||||
|
||||
/// Delay timer for establishing the rate limit.
|
||||
rate_limit_timer: Option<Delay>,
|
||||
}
|
||||
|
||||
impl PeerQueues {
|
||||
/// New empty `PeerQueues`.
|
||||
pub fn new() -> Self {
|
||||
Self { queues: HashMap::new(), rate_limit_timer: None }
|
||||
}
|
||||
|
||||
/// Push an incoming request for a given authority.
|
||||
///
|
||||
/// Returns: `Ok(())` if succeeded, `Err((args))` if capacity is reached.
|
||||
pub fn push_req(
|
||||
&mut self,
|
||||
peer: AuthorityDiscoveryId,
|
||||
req: IncomingRequest<DisputeRequest>,
|
||||
) -> Result<(), (AuthorityDiscoveryId, IncomingRequest<DisputeRequest>)> {
|
||||
let queue = match self.queues.entry(peer) {
|
||||
Entry::Vacant(vacant) => vacant.insert(VecDeque::new()),
|
||||
Entry::Occupied(occupied) => {
|
||||
if occupied.get().len() >= PEER_QUEUE_CAPACITY {
|
||||
return Err((occupied.key().clone(), req))
|
||||
}
|
||||
occupied.into_mut()
|
||||
},
|
||||
};
|
||||
queue.push_back(req);
|
||||
|
||||
// We have at least one element to process - rate limit `timer` needs to exist now:
|
||||
self.ensure_timer();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Pop all heads and return them for processing.
|
||||
///
|
||||
/// This gets one message from each peer that has sent at least one.
|
||||
///
|
||||
/// This function is rate limited, if called in sequence it will not return more often than
|
||||
/// every `RECEIVE_RATE_LIMIT`.
|
||||
///
|
||||
/// NOTE: If empty this function will not return `Ready` at all, but will always be `Pending`.
|
||||
pub async fn pop_reqs(&mut self) -> Vec<IncomingRequest<DisputeRequest>> {
|
||||
self.wait_for_timer().await;
|
||||
|
||||
let mut heads = Vec::with_capacity(self.queues.len());
|
||||
let old_queues = std::mem::replace(&mut self.queues, HashMap::new());
|
||||
for (k, mut queue) in old_queues.into_iter() {
|
||||
let front = queue.pop_front();
|
||||
debug_assert!(front.is_some(), "Invariant that queues are never empty is broken.");
|
||||
|
||||
if let Some(front) = front {
|
||||
heads.push(front);
|
||||
}
|
||||
if !queue.is_empty() {
|
||||
self.queues.insert(k, queue);
|
||||
}
|
||||
}
|
||||
|
||||
if !self.is_empty() {
|
||||
// Still not empty - we should get woken at some point.
|
||||
self.ensure_timer();
|
||||
}
|
||||
|
||||
heads
|
||||
}
|
||||
|
||||
/// Whether or not all queues are empty.
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.queues.is_empty()
|
||||
}
|
||||
|
||||
/// Ensure there is an active `timer`.
|
||||
///
|
||||
/// Checks whether one exists and if not creates one.
|
||||
fn ensure_timer(&mut self) -> &mut Delay {
|
||||
self.rate_limit_timer.get_or_insert(Delay::new(RECEIVE_RATE_LIMIT))
|
||||
}
|
||||
|
||||
/// Wait for `timer` if it exists, or be `Pending` forever.
|
||||
///
|
||||
/// Afterwards it gets set back to `None`.
|
||||
async fn wait_for_timer(&mut self) {
|
||||
match self.rate_limit_timer.as_mut() {
|
||||
None => pending().await,
|
||||
Some(timer) => timer.await,
|
||||
}
|
||||
self.rate_limit_timer = None;
|
||||
}
|
||||
}
|
||||
@@ -14,10 +14,21 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use std::collections::{hash_map::Entry, HashMap, HashSet};
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
pin::Pin,
|
||||
task::Poll,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use futures::channel::{mpsc, oneshot};
|
||||
use futures::{
|
||||
channel::{mpsc, oneshot},
|
||||
future::poll_fn,
|
||||
Future,
|
||||
};
|
||||
|
||||
use futures_timer::Delay;
|
||||
use indexmap::{map::Entry, IndexMap};
|
||||
use polkadot_node_network_protocol::request_response::v1::DisputeRequest;
|
||||
use polkadot_node_primitives::{CandidateVotes, DisputeMessage, SignedDisputeStatement};
|
||||
use polkadot_node_subsystem::{messages::DisputeCoordinatorMessage, overseer, ActiveLeavesUpdate};
|
||||
@@ -28,22 +39,27 @@ use polkadot_primitives::v2::{CandidateHash, DisputeStatement, Hash, SessionInde
|
||||
///
|
||||
/// It is going to spawn real tasks as it sees fit for getting the votes of the particular dispute
|
||||
/// out.
|
||||
///
|
||||
/// As we assume disputes have a priority, we start sending for disputes in the order
|
||||
/// `start_sender` got called.
|
||||
mod send_task;
|
||||
use send_task::SendTask;
|
||||
pub use send_task::TaskFinish;
|
||||
|
||||
/// Error and [`Result`] type for sender
|
||||
/// Error and [`Result`] type for sender.
|
||||
mod error;
|
||||
pub use error::{Error, FatalError, JfyiError, Result};
|
||||
|
||||
use self::error::JfyiErrorResult;
|
||||
use crate::{Metrics, LOG_TARGET};
|
||||
use crate::{Metrics, LOG_TARGET, SEND_RATE_LIMIT};
|
||||
|
||||
/// The `DisputeSender` keeps track of all ongoing disputes we need to send statements out.
|
||||
///
|
||||
/// For each dispute a `SendTask` is responsible for sending to the concerned validators for that
|
||||
/// particular dispute. The `DisputeSender` keeps track of those tasks, informs them about new
|
||||
/// sessions/validator sets and cleans them up when they become obsolete.
|
||||
///
|
||||
/// The unit of work for the `DisputeSender` is a dispute, represented by `SendTask`s.
|
||||
pub struct DisputeSender {
|
||||
/// All heads we currently consider active.
|
||||
active_heads: Vec<Hash>,
|
||||
@@ -54,11 +70,16 @@ pub struct DisputeSender {
|
||||
active_sessions: HashMap<SessionIndex, Hash>,
|
||||
|
||||
/// All ongoing dispute sendings this subsystem is aware of.
|
||||
disputes: HashMap<CandidateHash, SendTask>,
|
||||
///
|
||||
/// Using an `IndexMap` so items can be iterated in the order of insertion.
|
||||
disputes: IndexMap<CandidateHash, SendTask>,
|
||||
|
||||
/// Sender to be cloned for `SendTask`s.
|
||||
tx: mpsc::Sender<TaskFinish>,
|
||||
|
||||
/// Future for delaying too frequent creation of dispute sending tasks.
|
||||
rate_limit: RateLimit,
|
||||
|
||||
/// Metrics for reporting stats about sent requests.
|
||||
metrics: Metrics,
|
||||
}
|
||||
@@ -70,19 +91,25 @@ impl DisputeSender {
|
||||
Self {
|
||||
active_heads: Vec::new(),
|
||||
active_sessions: HashMap::new(),
|
||||
disputes: HashMap::new(),
|
||||
disputes: IndexMap::new(),
|
||||
tx,
|
||||
rate_limit: RateLimit::new(),
|
||||
metrics,
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a `SendTask` for a particular new dispute.
|
||||
///
|
||||
/// This function is rate-limited by `SEND_RATE_LIMIT`. It will block if called too frequently
|
||||
/// in order to maintain the limit.
|
||||
pub async fn start_sender<Context>(
|
||||
&mut self,
|
||||
ctx: &mut Context,
|
||||
runtime: &mut RuntimeInfo,
|
||||
msg: DisputeMessage,
|
||||
) -> Result<()> {
|
||||
self.rate_limit.limit().await;
|
||||
|
||||
let req: DisputeRequest = msg.into();
|
||||
let candidate_hash = req.0.candidate_receipt.hash();
|
||||
match self.disputes.entry(candidate_hash) {
|
||||
@@ -112,6 +139,8 @@ impl DisputeSender {
|
||||
/// - Get new authorities to send messages to.
|
||||
/// - Get rid of obsolete tasks and disputes.
|
||||
/// - Get dispute sending started in case we missed one for some reason (e.g. on node startup)
|
||||
///
|
||||
/// This function ensures the `SEND_RATE_LIMIT`, therefore it might block.
|
||||
pub async fn update_leaves<Context>(
|
||||
&mut self,
|
||||
ctx: &mut Context,
|
||||
@@ -134,21 +163,38 @@ impl DisputeSender {
|
||||
|
||||
let active_disputes: HashSet<_> = active_disputes.into_iter().map(|(_, c)| c).collect();
|
||||
|
||||
// Cleanup obsolete senders:
|
||||
// Cleanup obsolete senders (retain keeps order of remaining elements):
|
||||
self.disputes
|
||||
.retain(|candidate_hash, _| active_disputes.contains(candidate_hash));
|
||||
|
||||
// Iterates in order of insertion:
|
||||
let mut should_rate_limit = true;
|
||||
for dispute in self.disputes.values_mut() {
|
||||
if have_new_sessions || dispute.has_failed_sends() {
|
||||
dispute
|
||||
if should_rate_limit {
|
||||
self.rate_limit.limit().await;
|
||||
}
|
||||
let sends_happened = dispute
|
||||
.refresh_sends(ctx, runtime, &self.active_sessions, &self.metrics)
|
||||
.await?;
|
||||
// Only rate limit if we actually sent something out _and_ it was not just because
|
||||
// of errors on previous sends.
|
||||
//
|
||||
// Reasoning: It would not be acceptable to slow down the whole subsystem, just
|
||||
// because of a few bad peers having problems. It is actually better to risk
|
||||
// running into their rate limit in that case and accept a minor reputation change.
|
||||
should_rate_limit = sends_happened && have_new_sessions;
|
||||
}
|
||||
}
|
||||
|
||||
// This should only be non-empty on startup, but if not - we got you covered:
|
||||
// This should only be non-empty on startup, but if not - we got you covered.
|
||||
//
|
||||
// Initial order will not be maintained in that case, but that should be fine as disputes
|
||||
// recovered at startup will be relatively "old" anyway and we assume that no more than a
|
||||
// third of the validators will go offline at any point in time anyway.
|
||||
for dispute in unknown_disputes {
|
||||
self.start_send_for_dispute(ctx, runtime, dispute).await?
|
||||
self.rate_limit.limit().await;
|
||||
self.start_send_for_dispute(ctx, runtime, dispute).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -317,6 +363,46 @@ impl DisputeSender {
|
||||
}
|
||||
}
|
||||
|
||||
/// Rate limiting logic.
|
||||
///
|
||||
/// Suitable for the sending side.
|
||||
struct RateLimit {
|
||||
limit: Delay,
|
||||
}
|
||||
|
||||
impl RateLimit {
|
||||
/// Create new `RateLimit` that is immediately ready.
|
||||
fn new() -> Self {
|
||||
// Start with an empty duration, as there has not been any previous call.
|
||||
Self { limit: Delay::new(Duration::new(0, 0)) }
|
||||
}
|
||||
|
||||
/// Initialized with actual `SEND_RATE_LIMIT` duration.
|
||||
fn new_limit() -> Self {
|
||||
Self { limit: Delay::new(SEND_RATE_LIMIT) }
|
||||
}
|
||||
|
||||
/// Wait until ready and prepare for next call.
|
||||
async fn limit(&mut self) {
|
||||
// Wait for rate limit and add some logging:
|
||||
poll_fn(|cx| {
|
||||
let old_limit = Pin::new(&mut self.limit);
|
||||
match old_limit.poll(cx) {
|
||||
Poll::Pending => {
|
||||
gum::debug!(
|
||||
target: LOG_TARGET,
|
||||
"Sending rate limit hit, slowing down requests"
|
||||
);
|
||||
Poll::Pending
|
||||
},
|
||||
Poll::Ready(()) => Poll::Ready(()),
|
||||
}
|
||||
})
|
||||
.await;
|
||||
*self = Self::new_limit();
|
||||
}
|
||||
}
|
||||
|
||||
/// Retrieve the currently active sessions.
|
||||
///
|
||||
/// List is all indices of all active sessions together with the head that was used for the query.
|
||||
|
||||
@@ -42,13 +42,15 @@ use crate::{
|
||||
/// Delivery status for a particular dispute.
|
||||
///
|
||||
/// Keeps track of all the validators that have to be reached for a dispute.
|
||||
///
|
||||
/// The unit of work for a `SendTask` is an authority/validator.
|
||||
pub struct SendTask {
|
||||
/// The request we are supposed to get out to all parachain validators of the dispute's session
|
||||
/// The request we are supposed to get out to all `parachain` validators of the dispute's session
|
||||
/// and to all current authorities.
|
||||
request: DisputeRequest,
|
||||
|
||||
/// The set of authorities we need to send our messages to. This set will change at session
|
||||
/// boundaries. It will always be at least the parachain validators of the session where the
|
||||
/// boundaries. It will always be at least the `parachain` validators of the session where the
|
||||
/// dispute happened and the authorities of the current sessions as determined by active heads.
|
||||
deliveries: HashMap<AuthorityDiscoveryId, DeliveryStatus>,
|
||||
|
||||
@@ -100,6 +102,10 @@ impl TaskResult {
|
||||
#[overseer::contextbounds(DisputeDistribution, prefix = self::overseer)]
|
||||
impl SendTask {
|
||||
/// Initiates sending a dispute message to peers.
|
||||
///
|
||||
/// Creation of new `SendTask`s is subject to rate limiting. As each `SendTask` will trigger
|
||||
/// sending a message to each validator, hence for employing a per-peer rate limit, we need to
|
||||
/// limit the construction of new `SendTask`s.
|
||||
pub async fn new<Context>(
|
||||
ctx: &mut Context,
|
||||
runtime: &mut RuntimeInfo,
|
||||
@@ -118,15 +124,22 @@ impl SendTask {
|
||||
///
|
||||
/// This function is called at construction and should also be called whenever a session change
|
||||
/// happens and on a regular basis to ensure we are retrying failed attempts.
|
||||
///
|
||||
/// This might resend to validators and is thus subject to any rate limiting we might want.
|
||||
/// Calls to this function for different instances should be rate limited according to
|
||||
/// `SEND_RATE_LIMIT`.
|
||||
///
|
||||
/// Returns: `True` if this call resulted in new requests.
|
||||
pub async fn refresh_sends<Context>(
|
||||
&mut self,
|
||||
ctx: &mut Context,
|
||||
runtime: &mut RuntimeInfo,
|
||||
active_sessions: &HashMap<SessionIndex, Hash>,
|
||||
metrics: &Metrics,
|
||||
) -> Result<()> {
|
||||
) -> Result<bool> {
|
||||
let new_authorities = self.get_relevant_validators(ctx, runtime, active_sessions).await?;
|
||||
|
||||
// Note this will also contain all authorities for which sending failed previously:
|
||||
let add_authorities = new_authorities
|
||||
.iter()
|
||||
.filter(|a| !self.deliveries.contains_key(a))
|
||||
@@ -141,12 +154,14 @@ impl SendTask {
|
||||
send_requests(ctx, self.tx.clone(), add_authorities, self.request.clone(), metrics)
|
||||
.await?;
|
||||
|
||||
let was_empty = new_statuses.is_empty();
|
||||
|
||||
self.has_failed_sends = false;
|
||||
self.deliveries.extend(new_statuses.into_iter());
|
||||
Ok(())
|
||||
Ok(!was_empty)
|
||||
}
|
||||
|
||||
/// Whether any sends have failed since the last refreshed.
|
||||
/// Whether any sends have failed since the last refresh.
|
||||
pub fn has_failed_sends(&self) -> bool {
|
||||
self.has_failed_sends
|
||||
}
|
||||
@@ -193,9 +208,8 @@ impl SendTask {
|
||||
|
||||
/// Determine all validators that should receive the given dispute requests.
|
||||
///
|
||||
/// This is all parachain validators of the session the candidate occurred and all authorities
|
||||
/// This is all `parachain` validators of the session the candidate occurred and all authorities
|
||||
/// of all currently active sessions, determined by currently active heads.
|
||||
|
||||
async fn get_relevant_validators<Context>(
|
||||
&self,
|
||||
ctx: &mut Context,
|
||||
@@ -293,7 +307,7 @@ async fn wait_response_task(
|
||||
gum::debug!(
|
||||
target: LOG_TARGET,
|
||||
%err,
|
||||
"Failed to notify susystem about dispute sending result."
|
||||
"Failed to notify subsystem about dispute sending result."
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
sync::Arc,
|
||||
time::Instant,
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
@@ -38,6 +39,8 @@ use polkadot_primitives::v2::{
|
||||
};
|
||||
use polkadot_primitives_test_helpers::dummy_candidate_descriptor;
|
||||
|
||||
use crate::LOG_TARGET;
|
||||
|
||||
pub const MOCK_SESSION_INDEX: SessionIndex = 1;
|
||||
pub const MOCK_NEXT_SESSION_INDEX: SessionIndex = 2;
|
||||
pub const MOCK_VALIDATORS: [Sr25519Keyring; 6] = [
|
||||
@@ -54,6 +57,8 @@ pub const MOCK_AUTHORITIES_NEXT_SESSION: [Sr25519Keyring; 2] =
|
||||
|
||||
pub const FERDIE_INDEX: ValidatorIndex = ValidatorIndex(0);
|
||||
pub const ALICE_INDEX: ValidatorIndex = ValidatorIndex(1);
|
||||
pub const BOB_INDEX: ValidatorIndex = ValidatorIndex(2);
|
||||
pub const CHARLIE_INDEX: ValidatorIndex = ValidatorIndex(3);
|
||||
|
||||
lazy_static! {
|
||||
|
||||
@@ -148,12 +153,22 @@ pub async fn make_dispute_message(
|
||||
invalid_validator: ValidatorIndex,
|
||||
) -> DisputeMessage {
|
||||
let candidate_hash = candidate.hash();
|
||||
let before_request = Instant::now();
|
||||
let valid_vote =
|
||||
make_explicit_signed(MOCK_VALIDATORS[valid_validator.0 as usize], candidate_hash, true)
|
||||
.await;
|
||||
gum::trace!(
|
||||
"Passed time for valid vote: {:#?}",
|
||||
Instant::now().saturating_duration_since(before_request)
|
||||
);
|
||||
let before_request = Instant::now();
|
||||
let invalid_vote =
|
||||
make_explicit_signed(MOCK_VALIDATORS[invalid_validator.0 as usize], candidate_hash, false)
|
||||
.await;
|
||||
gum::trace!(
|
||||
"Passed time for invald vote: {:#?}",
|
||||
Instant::now().saturating_duration_since(before_request)
|
||||
);
|
||||
DisputeMessage::from_signed_statements(
|
||||
valid_vote,
|
||||
valid_validator,
|
||||
@@ -206,10 +221,15 @@ impl AuthorityDiscovery for MockAuthorityDiscovery {
|
||||
) -> Option<HashSet<polkadot_primitives::v2::AuthorityDiscoveryId>> {
|
||||
for (a, p) in self.peer_ids.iter() {
|
||||
if p == &peer_id {
|
||||
return Some(HashSet::from([MOCK_VALIDATORS_DISCOVERY_KEYS
|
||||
.get(&a)
|
||||
.unwrap()
|
||||
.clone()]))
|
||||
let result =
|
||||
HashSet::from([MOCK_VALIDATORS_DISCOVERY_KEYS.get(&a).unwrap().clone()]);
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
%peer_id,
|
||||
?result,
|
||||
"Returning authority ids for peer id"
|
||||
);
|
||||
return Some(result)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -17,12 +17,17 @@
|
||||
|
||||
//! Subsystem unit tests
|
||||
|
||||
use std::{collections::HashSet, sync::Arc, task::Poll, time::Duration};
|
||||
use std::{
|
||||
collections::HashSet,
|
||||
sync::Arc,
|
||||
task::Poll,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use assert_matches::assert_matches;
|
||||
use futures::{
|
||||
channel::{mpsc, oneshot},
|
||||
future::poll_fn,
|
||||
future::{poll_fn, ready},
|
||||
pin_mut, Future, SinkExt,
|
||||
};
|
||||
use futures_timer::Delay;
|
||||
@@ -52,7 +57,7 @@ use polkadot_node_subsystem_test_helpers::{
|
||||
mock::make_ferdie_keystore, subsystem_test_harness, TestSubsystemContextHandle,
|
||||
};
|
||||
use polkadot_primitives::v2::{
|
||||
AuthorityDiscoveryId, CandidateHash, Hash, SessionIndex, SessionInfo,
|
||||
AuthorityDiscoveryId, CandidateHash, CandidateReceipt, Hash, SessionIndex, SessionInfo,
|
||||
};
|
||||
|
||||
use self::mock::{
|
||||
@@ -60,7 +65,11 @@ use self::mock::{
|
||||
MOCK_AUTHORITY_DISCOVERY, MOCK_NEXT_SESSION_INDEX, MOCK_NEXT_SESSION_INFO, MOCK_SESSION_INDEX,
|
||||
MOCK_SESSION_INFO,
|
||||
};
|
||||
use crate::{DisputeDistributionSubsystem, Metrics, LOG_TARGET};
|
||||
use crate::{
|
||||
receiver::BATCH_COLLECTING_INTERVAL,
|
||||
tests::mock::{BOB_INDEX, CHARLIE_INDEX},
|
||||
DisputeDistributionSubsystem, Metrics, LOG_TARGET, SEND_RATE_LIMIT,
|
||||
};
|
||||
|
||||
/// Useful mock providers.
|
||||
pub mod mock;
|
||||
@@ -72,49 +81,108 @@ fn send_dispute_sends_dispute() {
|
||||
|
||||
let relay_parent = Hash::random();
|
||||
let candidate = make_candidate_receipt(relay_parent);
|
||||
let message = make_dispute_message(candidate.clone(), ALICE_INDEX, FERDIE_INDEX).await;
|
||||
handle
|
||||
.send(FromOrchestra::Communication {
|
||||
msg: DisputeDistributionMessage::SendDispute(message.clone()),
|
||||
})
|
||||
.await;
|
||||
// Requests needed session info:
|
||||
assert_matches!(
|
||||
handle.recv().await,
|
||||
AllMessages::RuntimeApi(
|
||||
RuntimeApiMessage::Request(
|
||||
hash,
|
||||
RuntimeApiRequest::SessionInfo(session_index, tx)
|
||||
)
|
||||
) => {
|
||||
assert_eq!(session_index, MOCK_SESSION_INDEX);
|
||||
assert_eq!(
|
||||
hash,
|
||||
message.candidate_receipt().descriptor.relay_parent
|
||||
);
|
||||
tx.send(Ok(Some(MOCK_SESSION_INFO.clone()))).expect("Receiver should stay alive.");
|
||||
}
|
||||
);
|
||||
|
||||
let expected_receivers = {
|
||||
let info = &MOCK_SESSION_INFO;
|
||||
info.discovery_keys
|
||||
.clone()
|
||||
.into_iter()
|
||||
.filter(|a| a != &Sr25519Keyring::Ferdie.public().into())
|
||||
.collect()
|
||||
// All validators are also authorities in the first session, so we are
|
||||
// done here.
|
||||
};
|
||||
check_sent_requests(&mut handle, expected_receivers, true).await;
|
||||
|
||||
send_dispute(&mut handle, candidate, true).await;
|
||||
conclude(&mut handle).await;
|
||||
};
|
||||
test_harness(test);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn received_request_triggers_import() {
|
||||
fn send_honors_rate_limit() {
|
||||
sp_tracing::try_init_simple();
|
||||
let test = |mut handle: TestSubsystemContextHandle<DisputeDistributionMessage>, _req_cfg| async move {
|
||||
let _ = handle_subsystem_startup(&mut handle, None).await;
|
||||
|
||||
let relay_parent = Hash::random();
|
||||
let candidate = make_candidate_receipt(relay_parent);
|
||||
let before_request = Instant::now();
|
||||
send_dispute(&mut handle, candidate, true).await;
|
||||
// First send should not be rate limited:
|
||||
gum::trace!("Passed time: {:#?}", Instant::now().saturating_duration_since(before_request));
|
||||
// This test would likely be flaky on CI:
|
||||
//assert!(Instant::now().saturating_duration_since(before_request) < SEND_RATE_LIMIT);
|
||||
|
||||
let relay_parent = Hash::random();
|
||||
let candidate = make_candidate_receipt(relay_parent);
|
||||
send_dispute(&mut handle, candidate, false).await;
|
||||
// Second send should be rate limited:
|
||||
gum::trace!(
|
||||
"Passed time for send_dispute: {:#?}",
|
||||
Instant::now().saturating_duration_since(before_request)
|
||||
);
|
||||
assert!(Instant::now() - before_request >= SEND_RATE_LIMIT);
|
||||
conclude(&mut handle).await;
|
||||
};
|
||||
test_harness(test);
|
||||
}
|
||||
|
||||
/// Helper for sending a new dispute to dispute-distribution sender and handling resulting messages.
|
||||
async fn send_dispute(
|
||||
handle: &mut TestSubsystemContextHandle<DisputeDistributionMessage>,
|
||||
candidate: CandidateReceipt,
|
||||
needs_session_info: bool,
|
||||
) {
|
||||
let before_request = Instant::now();
|
||||
let message = make_dispute_message(candidate.clone(), ALICE_INDEX, FERDIE_INDEX).await;
|
||||
gum::trace!(
|
||||
"Passed time for making message: {:#?}",
|
||||
Instant::now().saturating_duration_since(before_request)
|
||||
);
|
||||
let before_request = Instant::now();
|
||||
handle
|
||||
.send(FromOrchestra::Communication {
|
||||
msg: DisputeDistributionMessage::SendDispute(message.clone()),
|
||||
})
|
||||
.await;
|
||||
gum::trace!(
|
||||
"Passed time for sending message: {:#?}",
|
||||
Instant::now().saturating_duration_since(before_request)
|
||||
);
|
||||
if needs_session_info {
|
||||
// Requests needed session info:
|
||||
assert_matches!(
|
||||
handle.recv().await,
|
||||
AllMessages::RuntimeApi(
|
||||
RuntimeApiMessage::Request(
|
||||
hash,
|
||||
RuntimeApiRequest::SessionInfo(session_index, tx)
|
||||
)
|
||||
) => {
|
||||
assert_eq!(session_index, MOCK_SESSION_INDEX);
|
||||
assert_eq!(
|
||||
hash,
|
||||
message.candidate_receipt().descriptor.relay_parent
|
||||
);
|
||||
tx.send(Ok(Some(MOCK_SESSION_INFO.clone()))).expect("Receiver should stay alive.");
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
let expected_receivers = {
|
||||
let info = &MOCK_SESSION_INFO;
|
||||
info.discovery_keys
|
||||
.clone()
|
||||
.into_iter()
|
||||
.filter(|a| a != &Sr25519Keyring::Ferdie.public().into())
|
||||
.collect()
|
||||
// All validators are also authorities in the first session, so we are
|
||||
// done here.
|
||||
};
|
||||
check_sent_requests(handle, expected_receivers, true).await;
|
||||
}
|
||||
|
||||
// Things to test:
|
||||
// x Request triggers import
|
||||
// x Subsequent imports get batched
|
||||
// x Batch gets flushed.
|
||||
// x Batch gets renewed.
|
||||
// x Non authority requests get dropped.
|
||||
// x Sending rate limit is honored.
|
||||
// x Receiving rate limit is honored.
|
||||
// x Duplicate requests on batch are dropped
|
||||
|
||||
#[test]
|
||||
fn received_non_authorities_are_dropped() {
|
||||
let test = |mut handle: TestSubsystemContextHandle<DisputeDistributionMessage>,
|
||||
mut req_cfg: RequestResponseConfig| async move {
|
||||
let req_tx = req_cfg.inbound_queue.as_mut().unwrap();
|
||||
@@ -140,110 +208,271 @@ fn received_request_triggers_import() {
|
||||
assert_eq!(reputation_changes.len(), 1);
|
||||
}
|
||||
);
|
||||
conclude(&mut handle).await;
|
||||
};
|
||||
test_harness(test);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn received_request_triggers_import() {
|
||||
let test = |mut handle: TestSubsystemContextHandle<DisputeDistributionMessage>,
|
||||
mut req_cfg: RequestResponseConfig| async move {
|
||||
let req_tx = req_cfg.inbound_queue.as_mut().unwrap();
|
||||
let _ = handle_subsystem_startup(&mut handle, None).await;
|
||||
|
||||
let relay_parent = Hash::random();
|
||||
let candidate = make_candidate_receipt(relay_parent);
|
||||
let message = make_dispute_message(candidate.clone(), ALICE_INDEX, FERDIE_INDEX).await;
|
||||
|
||||
// Nested valid and invalid import.
|
||||
//
|
||||
// Nested requests from same peer should get dropped. For the invalid request even
|
||||
// subsequent requests should get dropped.
|
||||
nested_network_dispute_request(
|
||||
&mut handle,
|
||||
req_tx,
|
||||
MOCK_AUTHORITY_DISCOVERY.get_peer_id_by_authority(Sr25519Keyring::Alice),
|
||||
message.clone().into(),
|
||||
ImportStatementsResult::InvalidImport,
|
||||
ImportStatementsResult::ValidImport,
|
||||
true,
|
||||
move |handle, req_tx, message| {
|
||||
nested_network_dispute_request(
|
||||
handle,
|
||||
req_tx,
|
||||
MOCK_AUTHORITY_DISCOVERY.get_peer_id_by_authority(Sr25519Keyring::Bob),
|
||||
message.clone().into(),
|
||||
ImportStatementsResult::ValidImport,
|
||||
false,
|
||||
move |_, req_tx, message| async move {
|
||||
// Another request from Alice should get dropped (request already in
|
||||
// flight):
|
||||
{
|
||||
let rx_response = send_network_dispute_request(
|
||||
req_tx,
|
||||
MOCK_AUTHORITY_DISCOVERY
|
||||
.get_peer_id_by_authority(Sr25519Keyring::Alice),
|
||||
message.clone(),
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_matches!(
|
||||
rx_response.await,
|
||||
Err(err) => {
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
?err,
|
||||
"Request got dropped - other request already in flight"
|
||||
);
|
||||
}
|
||||
);
|
||||
}
|
||||
// Another request from Bob should get dropped (request already in
|
||||
// flight):
|
||||
{
|
||||
let rx_response = send_network_dispute_request(
|
||||
req_tx,
|
||||
MOCK_AUTHORITY_DISCOVERY
|
||||
.get_peer_id_by_authority(Sr25519Keyring::Bob),
|
||||
message.clone(),
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_matches!(
|
||||
rx_response.await,
|
||||
Err(err) => {
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
?err,
|
||||
"Request got dropped - other request already in flight"
|
||||
);
|
||||
}
|
||||
);
|
||||
}
|
||||
},
|
||||
)
|
||||
},
|
||||
move |_handle, _req_tx, _message| ready(()),
|
||||
)
|
||||
.await;
|
||||
|
||||
// Subsequent sends from Alice should fail (peer is banned):
|
||||
{
|
||||
let rx_response = send_network_dispute_request(
|
||||
req_tx,
|
||||
MOCK_AUTHORITY_DISCOVERY.get_peer_id_by_authority(Sr25519Keyring::Alice),
|
||||
message.clone().into(),
|
||||
)
|
||||
.await;
|
||||
gum::trace!(target: LOG_TARGET, "Concluding.");
|
||||
conclude(&mut handle).await;
|
||||
};
|
||||
test_harness(test);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn batching_works() {
|
||||
let test = |mut handle: TestSubsystemContextHandle<DisputeDistributionMessage>,
|
||||
mut req_cfg: RequestResponseConfig| async move {
|
||||
let req_tx = req_cfg.inbound_queue.as_mut().unwrap();
|
||||
let _ = handle_subsystem_startup(&mut handle, None).await;
|
||||
|
||||
let relay_parent = Hash::random();
|
||||
let candidate = make_candidate_receipt(relay_parent);
|
||||
let message = make_dispute_message(candidate.clone(), ALICE_INDEX, FERDIE_INDEX).await;
|
||||
|
||||
// Initial request should get forwarded immediately:
|
||||
nested_network_dispute_request(
|
||||
&mut handle,
|
||||
req_tx,
|
||||
MOCK_AUTHORITY_DISCOVERY.get_peer_id_by_authority(Sr25519Keyring::Alice),
|
||||
message.clone().into(),
|
||||
ImportStatementsResult::ValidImport,
|
||||
true,
|
||||
move |_handle, _req_tx, _message| ready(()),
|
||||
)
|
||||
.await;
|
||||
|
||||
let mut rx_responses = Vec::new();
|
||||
|
||||
let message = make_dispute_message(candidate.clone(), BOB_INDEX, FERDIE_INDEX).await;
|
||||
let peer = MOCK_AUTHORITY_DISCOVERY.get_peer_id_by_authority(Sr25519Keyring::Bob);
|
||||
rx_responses.push(send_network_dispute_request(req_tx, peer, message.clone().into()).await);
|
||||
|
||||
let message = make_dispute_message(candidate.clone(), CHARLIE_INDEX, FERDIE_INDEX).await;
|
||||
let peer = MOCK_AUTHORITY_DISCOVERY.get_peer_id_by_authority(Sr25519Keyring::Charlie);
|
||||
rx_responses.push(send_network_dispute_request(req_tx, peer, message.clone().into()).await);
|
||||
gum::trace!("Imported 3 votes into batch");
|
||||
|
||||
Delay::new(BATCH_COLLECTING_INTERVAL).await;
|
||||
gum::trace!("Batch should still be alive");
|
||||
// Batch should still be alive (2 new votes):
|
||||
// Let's import two more votes, but fully duplicates - should not extend batch live.
|
||||
gum::trace!("Importing duplicate votes");
|
||||
let mut rx_responses_duplicate = Vec::new();
|
||||
let message = make_dispute_message(candidate.clone(), BOB_INDEX, FERDIE_INDEX).await;
|
||||
let peer = MOCK_AUTHORITY_DISCOVERY.get_peer_id_by_authority(Sr25519Keyring::Bob);
|
||||
rx_responses_duplicate
|
||||
.push(send_network_dispute_request(req_tx, peer, message.clone().into()).await);
|
||||
|
||||
let message = make_dispute_message(candidate.clone(), CHARLIE_INDEX, FERDIE_INDEX).await;
|
||||
let peer = MOCK_AUTHORITY_DISCOVERY.get_peer_id_by_authority(Sr25519Keyring::Charlie);
|
||||
rx_responses_duplicate
|
||||
.push(send_network_dispute_request(req_tx, peer, message.clone().into()).await);
|
||||
|
||||
for rx_response in rx_responses_duplicate {
|
||||
assert_matches!(
|
||||
rx_response.await,
|
||||
Err(err) => {
|
||||
Ok(resp) => {
|
||||
let sc_network::config::OutgoingResponse {
|
||||
result,
|
||||
reputation_changes,
|
||||
sent_feedback: _,
|
||||
} = resp;
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
?err,
|
||||
"Request got dropped - peer is banned."
|
||||
);
|
||||
?reputation_changes,
|
||||
"Received reputation changes."
|
||||
);
|
||||
// We don't punish on that.
|
||||
assert_eq!(reputation_changes.len(), 0);
|
||||
|
||||
assert_matches!(result, Err(()));
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
// But should work fine for Bob:
|
||||
Delay::new(BATCH_COLLECTING_INTERVAL).await;
|
||||
gum::trace!("Batch should be ready now (only duplicates have been added)");
|
||||
|
||||
let pending_confirmation = assert_matches!(
|
||||
handle.recv().await,
|
||||
AllMessages::DisputeCoordinator(
|
||||
DisputeCoordinatorMessage::ImportStatements {
|
||||
candidate_receipt: _,
|
||||
session,
|
||||
statements,
|
||||
pending_confirmation: Some(pending_confirmation),
|
||||
}
|
||||
) => {
|
||||
assert_eq!(session, MOCK_SESSION_INDEX);
|
||||
assert_eq!(statements.len(), 3);
|
||||
pending_confirmation
|
||||
}
|
||||
);
|
||||
pending_confirmation.send(ImportStatementsResult::ValidImport).unwrap();
|
||||
|
||||
for rx_response in rx_responses {
|
||||
assert_matches!(
|
||||
rx_response.await,
|
||||
Ok(resp) => {
|
||||
let sc_network::config::OutgoingResponse {
|
||||
result,
|
||||
reputation_changes: _,
|
||||
sent_feedback,
|
||||
} = resp;
|
||||
|
||||
let result = result.unwrap();
|
||||
let decoded =
|
||||
<DisputeResponse as Decode>::decode(&mut result.as_slice()).unwrap();
|
||||
|
||||
assert!(decoded == DisputeResponse::Confirmed);
|
||||
if let Some(sent_feedback) = sent_feedback {
|
||||
sent_feedback.send(()).unwrap();
|
||||
}
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
"Valid import happened."
|
||||
);
|
||||
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
gum::trace!(target: LOG_TARGET, "Concluding.");
|
||||
conclude(&mut handle).await;
|
||||
};
|
||||
test_harness(test);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn receive_rate_limit_is_enforced() {
|
||||
let test = |mut handle: TestSubsystemContextHandle<DisputeDistributionMessage>,
|
||||
mut req_cfg: RequestResponseConfig| async move {
|
||||
let req_tx = req_cfg.inbound_queue.as_mut().unwrap();
|
||||
let _ = handle_subsystem_startup(&mut handle, None).await;
|
||||
|
||||
let relay_parent = Hash::random();
|
||||
let candidate = make_candidate_receipt(relay_parent);
|
||||
let message = make_dispute_message(candidate.clone(), ALICE_INDEX, FERDIE_INDEX).await;
|
||||
|
||||
// Initial request should get forwarded immediately:
|
||||
nested_network_dispute_request(
|
||||
&mut handle,
|
||||
req_tx,
|
||||
MOCK_AUTHORITY_DISCOVERY.get_peer_id_by_authority(Sr25519Keyring::Bob),
|
||||
MOCK_AUTHORITY_DISCOVERY.get_peer_id_by_authority(Sr25519Keyring::Alice),
|
||||
message.clone().into(),
|
||||
ImportStatementsResult::ValidImport,
|
||||
false,
|
||||
|_, _, _| async {},
|
||||
true,
|
||||
move |_handle, _req_tx, _message| ready(()),
|
||||
)
|
||||
.await;
|
||||
|
||||
let mut rx_responses = Vec::new();
|
||||
|
||||
let peer = MOCK_AUTHORITY_DISCOVERY.get_peer_id_by_authority(Sr25519Keyring::Bob);
|
||||
|
||||
let message = make_dispute_message(candidate.clone(), BOB_INDEX, FERDIE_INDEX).await;
|
||||
rx_responses.push(send_network_dispute_request(req_tx, peer, message.clone().into()).await);
|
||||
|
||||
let message = make_dispute_message(candidate.clone(), CHARLIE_INDEX, FERDIE_INDEX).await;
|
||||
rx_responses.push(send_network_dispute_request(req_tx, peer, message.clone().into()).await);
|
||||
|
||||
gum::trace!("Import one too much:");
|
||||
|
||||
let message = make_dispute_message(candidate.clone(), CHARLIE_INDEX, ALICE_INDEX).await;
|
||||
let rx_response_flood =
|
||||
send_network_dispute_request(req_tx, peer, message.clone().into()).await;
|
||||
|
||||
assert_matches!(
|
||||
rx_response_flood.await,
|
||||
Ok(resp) => {
|
||||
let sc_network::config::OutgoingResponse {
|
||||
result: _,
|
||||
reputation_changes,
|
||||
sent_feedback: _,
|
||||
} = resp;
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
?reputation_changes,
|
||||
"Received reputation changes."
|
||||
);
|
||||
// Received punishment for flood:
|
||||
assert_eq!(reputation_changes.len(), 1);
|
||||
}
|
||||
);
|
||||
gum::trace!("Need to wait 2 patch intervals:");
|
||||
Delay::new(BATCH_COLLECTING_INTERVAL).await;
|
||||
Delay::new(BATCH_COLLECTING_INTERVAL).await;
|
||||
|
||||
gum::trace!("Batch should be ready now");
|
||||
|
||||
let pending_confirmation = assert_matches!(
|
||||
handle.recv().await,
|
||||
AllMessages::DisputeCoordinator(
|
||||
DisputeCoordinatorMessage::ImportStatements {
|
||||
candidate_receipt: _,
|
||||
session,
|
||||
statements,
|
||||
pending_confirmation: Some(pending_confirmation),
|
||||
}
|
||||
) => {
|
||||
assert_eq!(session, MOCK_SESSION_INDEX);
|
||||
// Only 3 as fourth was flood:
|
||||
assert_eq!(statements.len(), 3);
|
||||
pending_confirmation
|
||||
}
|
||||
);
|
||||
pending_confirmation.send(ImportStatementsResult::ValidImport).unwrap();
|
||||
|
||||
for rx_response in rx_responses {
|
||||
assert_matches!(
|
||||
rx_response.await,
|
||||
Ok(resp) => {
|
||||
let sc_network::config::OutgoingResponse {
|
||||
result,
|
||||
reputation_changes: _,
|
||||
sent_feedback,
|
||||
} = resp;
|
||||
|
||||
let result = result.unwrap();
|
||||
let decoded =
|
||||
<DisputeResponse as Decode>::decode(&mut result.as_slice()).unwrap();
|
||||
|
||||
assert!(decoded == DisputeResponse::Confirmed);
|
||||
if let Some(sent_feedback) = sent_feedback {
|
||||
sent_feedback.send(()).unwrap();
|
||||
}
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
"Valid import happened."
|
||||
);
|
||||
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
gum::trace!(target: LOG_TARGET, "Concluding.");
|
||||
conclude(&mut handle).await;
|
||||
};
|
||||
|
||||
@@ -121,6 +121,10 @@ const POV_RESPONSE_SIZE: u64 = MAX_POV_SIZE as u64 + 10_000;
|
||||
/// This is `MAX_CODE_SIZE` plus some additional space for protocol overhead.
|
||||
const STATEMENT_RESPONSE_SIZE: u64 = MAX_CODE_SIZE as u64 + 10_000;
|
||||
|
||||
/// We can have relative large timeouts here, there is no value of hitting a
|
||||
/// timeout as we want to get statements through to each node in any case.
|
||||
pub const DISPUTE_REQUEST_TIMEOUT: Duration = Duration::from_secs(12);
|
||||
|
||||
impl Protocol {
|
||||
/// Get a configuration for a given Request response protocol.
|
||||
///
|
||||
@@ -194,9 +198,7 @@ impl Protocol {
|
||||
/// Responses are just confirmation, in essence not even a bit. So 100 seems
|
||||
/// plenty.
|
||||
max_response_size: 100,
|
||||
/// We can have relative large timeouts here, there is no value of hitting a
|
||||
/// timeout as we want to get statements through to each node in any case.
|
||||
request_timeout: Duration::from_secs(12),
|
||||
request_timeout: DISPUTE_REQUEST_TIMEOUT,
|
||||
inbound_queue: Some(tx),
|
||||
},
|
||||
};
|
||||
|
||||
@@ -15,6 +15,13 @@ This design should result in a protocol that is:
|
||||
|
||||
## Protocol
|
||||
|
||||
Distributing disputes needs to be a reliable protocol. We would like to make as
|
||||
sure as possible that our vote got properly delivered to all concerned
|
||||
validators. For this to work, this subsystem won't be gossip based, but instead
|
||||
will use a request/response protocol for application level confirmations. The
|
||||
request will be the payload (the actual votes/statements), the response will
|
||||
be the confirmation. See [below][#wire-format].
|
||||
|
||||
### Input
|
||||
|
||||
[`DisputeDistributionMessage`][DisputeDistributionMessage]
|
||||
@@ -107,16 +114,7 @@ struct VotesResponse {
|
||||
}
|
||||
```
|
||||
|
||||
## Functionality
|
||||
|
||||
Distributing disputes needs to be a reliable protocol. We would like to make as
|
||||
sure as possible that our vote got properly delivered to all concerned
|
||||
validators. For this to work, this subsystem won't be gossip based, but instead
|
||||
will use a request/response protocol for application level confirmations. The
|
||||
request will be the payload (the actual votes/statements), the response will
|
||||
be the confirmation. See [above][#wire-format].
|
||||
|
||||
### Starting a Dispute
|
||||
## Starting a Dispute
|
||||
|
||||
A dispute is initiated once a node sends the first `DisputeRequest` wire message,
|
||||
which must contain an "invalid" vote and a "valid" vote.
|
||||
@@ -132,7 +130,7 @@ conflicting votes available, hence we have a valid dispute. Nodes will still
|
||||
need to check whether the disputing votes are somewhat current and not some
|
||||
stale ones.
|
||||
|
||||
### Participating in a Dispute
|
||||
## Participating in a Dispute
|
||||
|
||||
Upon receiving a `DisputeRequest` message, a dispute distribution will trigger the
|
||||
import of the received votes via the dispute coordinator
|
||||
@@ -144,13 +142,13 @@ except that if the local node deemed the candidate valid, the `SendDispute`
|
||||
message will contain a valid vote signed by our node and will contain the
|
||||
initially received `Invalid` vote.
|
||||
|
||||
Note, that we rely on the coordinator to check availability for spam protection
|
||||
(see below).
|
||||
Note, that we rely on `dispute-coordinator` to check validity of a dispute for spam
|
||||
protection (see below).
|
||||
|
||||
### Sending of messages
|
||||
## Sending of messages
|
||||
|
||||
Starting and participating in a dispute are pretty similar from the perspective
|
||||
of dispute distribution. Once we receive a `SendDispute` message we try to make
|
||||
of dispute distribution. Once we receive a `SendDispute` message, we try to make
|
||||
sure to get the data out. We keep track of all the parachain validators that
|
||||
should see the message, which are all the parachain validators of the session
|
||||
where the dispute happened as they will want to participate in the dispute. In
|
||||
@@ -159,114 +157,185 @@ session (which might be the same or not and may change during the dispute).
|
||||
Those authorities will not participate in the dispute, but need to see the
|
||||
statements so they can include them in blocks.
|
||||
|
||||
We keep track of connected parachain validators and authorities and will issue
|
||||
warnings in the logs if connected nodes are less than two thirds of the
|
||||
corresponding sets. We also only consider a message transmitted, once we
|
||||
received a confirmation message. If not, we will keep retrying getting that
|
||||
message out as long as the dispute is deemed alive. To determine whether a
|
||||
dispute is still alive we will issue a
|
||||
### Reliability
|
||||
|
||||
We only consider a message transmitted, once we received a confirmation message.
|
||||
If not, we will keep retrying getting that message out as long as the dispute is
|
||||
deemed alive. To determine whether a dispute is still alive we will ask the
|
||||
`dispute-coordinator` for a list of all still active disputes via a
|
||||
`DisputeCoordinatorMessage::ActiveDisputes` message before each retry run. Once
|
||||
a dispute is no longer live, we will clean up the state accordingly.
|
||||
|
||||
### Reception & Spam Considerations
|
||||
### Order
|
||||
|
||||
Because we are not forwarding foreign statements, spam is less of an issue in
|
||||
comparison to gossip based systems. Rate limiting should be implemented at the
|
||||
substrate level, see
|
||||
[#7750](https://github.com/paritytech/substrate/issues/7750). Still we should
|
||||
make sure that it is not possible via spamming to prevent a dispute concluding
|
||||
or worse from getting noticed.
|
||||
We assume `SendDispute` messages are coming in an order of importance, hence
|
||||
`dispute-distribution` will make sure to send out network messages in the same
|
||||
order, even on retry.
|
||||
|
||||
Considered attack vectors:
|
||||
### Rate Limit
|
||||
|
||||
1. Invalid disputes (candidate does not exist) could make us
|
||||
run out of resources. E.g. if we recorded every statement, we could run out
|
||||
of disk space eventually.
|
||||
2. An attacker can just flood us with notifications on any notification
|
||||
protocol, assuming flood protection is not effective enough, our unbounded
|
||||
buffers can fill up and we will run out of memory eventually.
|
||||
3. An attacker could participate in a valid dispute, but send its votes multiple
|
||||
times.
|
||||
4. Attackers could spam us at a high rate with invalid disputes. Our incoming
|
||||
queue of requests could get monopolized by those malicious requests and we
|
||||
won't be able to import any valid disputes and we could run out of resources,
|
||||
if we tried to process them all in parallel.
|
||||
For spam protection (see below), we employ an artificial rate limiting on sending
|
||||
out messages in order to not hit the rate limit at the receiving side, which
|
||||
would result in our messages getting dropped and our reputation getting reduced.
|
||||
|
||||
For tackling 1, we make sure to not occupy resources before we don't know a
|
||||
candidate is available. So we will not record statements to disk until we
|
||||
recovered availability for the candidate or know by some other means that the
|
||||
dispute is legit.
|
||||
## Reception
|
||||
|
||||
For 2, we will pick up on any dispute on restart, so assuming that any realistic
|
||||
memory filling attack will take some time, we should be able to participate in a
|
||||
dispute under such attacks.
|
||||
As we shall see the receiving side is mostly about handling spam and ensuring
|
||||
the dispute-coordinator learns about disputes as fast as possible.
|
||||
|
||||
Importing/discarding redundant votes should be pretty quick, so measures with
|
||||
regards to 4 should suffice to prevent 3, from doing any real harm.
|
||||
Goals for the receiving side:
|
||||
|
||||
For 4, full monopolization of the incoming queue should not be possible assuming
|
||||
substrate handles incoming requests in a somewhat fair way. Still we want some
|
||||
defense mechanisms, at the very least we need to make sure to not exhaust
|
||||
resources.
|
||||
1. Get new disputes to the dispute-coordinator as fast as possible, so
|
||||
prioritization can happen properly.
|
||||
2. Batch votes per disputes as much as possible for good import performance.
|
||||
3. Prevent malicious nodes exhausting node resources by sending lots of messages.
|
||||
4. Prevent malicious nodes from sending so many messages/(fake) disputes,
|
||||
preventing us from concluding good ones.
|
||||
5. Limit ability of malicious nodes of delaying the vote import due to batching
|
||||
logic.
|
||||
|
||||
The dispute coordinator will notify us on import about unavailable candidates or
|
||||
otherwise invalid imports and we can disconnect from such peers/decrease their
|
||||
reputation drastically. This alone should get us quite far with regards to queue
|
||||
monopolization, as availability recovery is expected to fail relatively quickly
|
||||
for unavailable data.
|
||||
Goal 1 and 2 seem to be conflicting, but an easy compromise is possible: When
|
||||
learning about a new dispute, we will import the vote immediately, making the
|
||||
dispute coordinator aware and also getting immediate feedback on the validity.
|
||||
Then if valid we can batch further incoming votes, with less time constraints as
|
||||
the dispute-coordinator already knows about the dispute.
|
||||
|
||||
Still if those spam messages come at a very high rate, we might still run out of
|
||||
resources if we immediately call `DisputeCoordinatorMessage::ImportStatements`
|
||||
on each one of them. Secondly with our assumption of 1/3 dishonest validators,
|
||||
getting rid of all of them will take some time, depending on reputation timeouts
|
||||
some of them might even be able to reconnect eventually.
|
||||
Goal 3 and 4 are obviously very related and both can easily be solved via rate
|
||||
limiting as we shall see below. Rate limits should already be implemented at the
|
||||
substrate level, but [are not](https://github.com/paritytech/substrate/issues/7750)
|
||||
at the time of writing. But even if they were, the enforced substrate limits would
|
||||
likely not be configurable and thus would still be to high for our needs as we can
|
||||
rely on the following observations:
|
||||
|
||||
To mitigate those issues we will process dispute messages with a maximum
|
||||
parallelism `N`. We initiate import processes for up to `N` candidates in
|
||||
parallel. Once we reached `N` parallel requests we will start back pressuring on
|
||||
the incoming requests. This saves us from resource exhaustion.
|
||||
1. Each honest validator will only send one message (apart from duplicates on
|
||||
timeout) per candidate/dispute.
|
||||
2. An honest validator needs to fully recover availability and validate the
|
||||
candidate for casting a vote.
|
||||
|
||||
To reduce impact of malicious nodes further, we can keep track from which nodes the
|
||||
currently importing statements came from and will drop requests from nodes that
|
||||
already have imports in flight.
|
||||
With these two observations, we can conclude that honest validators will usually
|
||||
not send messages at a high rate. We can therefore enforce conservative rate
|
||||
limits and thus minimize harm spamming malicious nodes can have.
|
||||
|
||||
Honest nodes are not expected to send dispute statements at a high rate, but
|
||||
even if they did:
|
||||
Before we dive into how rate limiting solves all spam issues elegantly, let's
|
||||
discuss that honest behaviour further:
|
||||
|
||||
- we will import at least the first one and if it is valid it will trigger a
|
||||
dispute, preventing finality.
|
||||
- Chances are good that the first sent candidate from a peer is indeed the
|
||||
oldest one (if they differ in age at all).
|
||||
- for the dropped request any honest node will retry sending.
|
||||
- there will be other nodes notifying us about that dispute as well.
|
||||
- honest votes have a speed advantage on average. Apart from the very first
|
||||
dispute statement for a candidate, which might cause the availability recovery
|
||||
process, imports of honest votes will be super fast, while for spam imports
|
||||
they will always take some time as we have to wait for availability to fail.
|
||||
What about session changes? Here we might have to inform a new validator set of
|
||||
lots of already existing disputes at once.
|
||||
|
||||
So this general rate limit, that we drop requests from same peers if they come
|
||||
faster than we can import the statements should not cause any problems for
|
||||
honest nodes and is in their favor.
|
||||
With observation 1) and a rate limit that is per peer, we are still good:
|
||||
|
||||
Size of `N`: The larger `N` the better we can handle distributed flood attacks
|
||||
(see previous paragraph), but we also get potentially more availability recovery
|
||||
processes happening at the same time, which slows down the individual processes.
|
||||
And we rather want to have one finish quickly than lots slowly at the same time.
|
||||
On the other hand, valid disputes are expected to be rare, so if we ever exhaust
|
||||
`N` it is very likely that this is caused by spam and spam recoveries don't cost
|
||||
too much bandwidth due to empty responses.
|
||||
Let's assume a rate limit of one message per 200ms per sender. This means 5
|
||||
messages from each validator per second. 5 messages means 5 disputes!
|
||||
Conclusively, we will be able to conclude 5 disputes per second - no matter what
|
||||
malicious actors are doing. This is assuming dispute messages are sent ordered,
|
||||
but even if not perfectly ordered: On average it will be 5 disputes per second.
|
||||
|
||||
Considering that an attacker would need to attack many nodes in parallel to have
|
||||
any effect, an `N` of 10 seems to be a good compromise. For honest requests, most
|
||||
of those imports will likely concern the same candidate, and for dishonest ones
|
||||
we get to disconnect from up to ten colluding adversaries at a time.
|
||||
This is good enough! All those disputes are valid ones and will result in
|
||||
slashing and disabling of validators. Let's assume all of them conclude `valid`,
|
||||
and we disable validators only after 100 raised concluding valid disputes, we
|
||||
would still start disabling misbehaving validators in only 20 seconds.
|
||||
|
||||
For the size of the channel for incoming requests: Due to dropping of repeated
|
||||
requests from same nodes we can make the channel relatively large without fear
|
||||
of lots of spam requests sitting there wasting our time, even after we already
|
||||
blocked a peer. For valid disputes, incoming requests can become bursty. On the
|
||||
other hand we will also be very quick in processing them. A channel size of 100
|
||||
requests seems plenty and should be able to handle bursts adequately.
|
||||
One could also think that in addition participation is expected to take longer,
|
||||
which means on average we can import/conclude disputes faster than they are
|
||||
generated - regardless of dispute spam. Unfortunately this is not necessarily
|
||||
true: There might be parachains with very light load where recovery and
|
||||
validation can be accomplished very quickly - maybe faster than we can import
|
||||
those disputes.
|
||||
|
||||
This is probably an argument for not imposing a too low rate limit, although the
|
||||
issue is more general: Even without any rate limit, if an attacker generates
|
||||
disputes at a very high rate, nodes will be having trouble keeping participation
|
||||
up, hence the problem should be mitigated at a [more fundamental
|
||||
layer](https://github.com/paritytech/polkadot/issues/5898).
|
||||
|
||||
For nodes that have been offline for a while, the same argument as for session
|
||||
changes holds, but matters even less: We assume 2/3 of nodes to be online, so
|
||||
even if the worst case 1/3 offline happens and they could not import votes fast
|
||||
enough (as argued above, they in fact can) it would not matter for consensus.
|
||||
|
||||
### Rate Limiting
|
||||
|
||||
As suggested previously, rate limiting allows to mitigate all threats that come
|
||||
from malicious actors trying to overwhelm the system in order to get away without
|
||||
a slash, when it comes to dispute-distribution. In this section we will explain
|
||||
how in greater detail.
|
||||
|
||||
The idea is to open a queue with limited size for each peer. We will process
|
||||
incoming messages as fast as we can by doing the following:
|
||||
|
||||
1. Check that the sending peer is actually a valid authority - otherwise drop
|
||||
message and decrease reputation/disconnect.
|
||||
2. Put message on the peer's queue, if queue is full - drop it.
|
||||
|
||||
Every `RATE_LIMIT` seconds (or rather milliseconds), we pause processing
|
||||
incoming requests to go a full circle and process one message from each queue.
|
||||
Processing means `Batching` as explained in the next section.
|
||||
|
||||
### Batching
|
||||
|
||||
To achieve goal 2 we will batch incoming votes/messages together before passing
|
||||
them on as a single batch to the `dispute-coordinator`. To adhere to goal 1 as
|
||||
well, we will do the following:
|
||||
|
||||
1. For an incoming message, we check whether we have an existing batch for that
|
||||
candidate, if not we import directly to the dispute-coordinator, as we have
|
||||
to assume this is concerning a new dispute.
|
||||
2. We open a batch and start collecting incoming messages for that candidate,
|
||||
instead of immediately forwarding.
|
||||
4. We keep collecting votes in the batch until we receive less than
|
||||
`MIN_KEEP_BATCH_ALIVE_VOTES` unique votes in the last `BATCH_COLLECTING_INTERVAL`. This is
|
||||
important to accommodate for goal 5 and also 3.
|
||||
5. We send the whole batch to the dispute-coordinator.
|
||||
|
||||
This together with rate limiting explained above ensures we will be able to
|
||||
process valid disputes: We can limit the number of simultaneous existing batches
|
||||
to some high value, but can be rather certain that this limit will never be
|
||||
reached - hence we won't drop valid disputes:
|
||||
|
||||
Let's assume `MIN_KEEP_BATCH_ALIVE_VOTES` is 10, `BATCH_COLLECTING_INTERVAL`
|
||||
is `500ms` and above `RATE_LIMIT` is `100ms`. 1/3 of validators are malicious,
|
||||
so for 1000 this means around 330 malicious actors worst case.
|
||||
|
||||
All those actors can send a message every `100ms`, that is 10 per second. This
|
||||
means at the begining of an attack they can open up around 3300 batches. Each
|
||||
containing two votes. So memory usage is still negligible. In reality it is even
|
||||
less, as we also demand 10 new votes to trickle in per batch in order to keep it
|
||||
alive, every `500ms`. Hence for the first second, each batch requires 20 votes
|
||||
each. Each message is 2 votes, so this means 10 messages per batch. Hence to
|
||||
keep those batches alive 10 attackers are needed for each batch. This reduces
|
||||
the number of opened batches by a factor of 10: So we only have 330 batches in 1
|
||||
second - each containing 20 votes.
|
||||
|
||||
The next second: In order to further grow memory usage, attackers have to
|
||||
maintain 10 messages per batch and second. Number of batches equals the number
|
||||
of attackers, each has 10 messages per second, all are needed to maintain the
|
||||
batches in memory. Therefore we have a hard cap of around 330 (number of
|
||||
malicious nodes) open batches. Each can be filled with number of malicious
|
||||
actor's votes. So 330 batches with each 330 votes: Let's assume approximately 100
|
||||
bytes per signature/vote. This results in a worst case memory usage of 330 * 330
|
||||
* 100 ~= 10 MiB.
|
||||
|
||||
For 10_000 validators, we are already in the Gigabyte range, which means that
|
||||
with a validator set that large we might want to be more strict with the rate limit or
|
||||
require a larger rate of incoming votes per batch to keep them alive.
|
||||
|
||||
For a thousand validators a limit on batches of around 1000 should never be
|
||||
reached in practice. Hence due to rate limiting we have a very good chance to
|
||||
not ever having to drop a potential valid dispute due to some resource limit.
|
||||
|
||||
Further safe guards are possible: The dispute-coordinator actually
|
||||
confirms/denies imports. So once we receive a denial by the dispute-coordinator
|
||||
for the initial imported votes, we can opt into flushing the batch immediately
|
||||
and importing the votes. This swaps memory usage for more CPU usage, but if that
|
||||
import is deemed invalid again we can immediately decrease the reputation of the
|
||||
sending peers, so this should be a net win. For the time being we punt on this
|
||||
for simplicity.
|
||||
|
||||
Instead of filling batches to maximize memory usage, attackers could also try to
|
||||
overwhelm the dispute coordinator by only sending votes for new candidates all
|
||||
the time. This attack vector is mitigated also by above rate limit and
|
||||
decreasing the peer's reputation on denial of the invalid imports by the
|
||||
coordinator.
|
||||
|
||||
### Node Startup
|
||||
|
||||
|
||||
Reference in New Issue
Block a user