Dispute spam protection (#4134)

* Mostly notes.

* Better error messages.

* Introduce Fatal/NonFatal + drop back channel participation

- Fatal/NonFatal - in order to make it easier to use utility functions.
- We drop the back channel in dispute participation as it won't be
needed any more.

* Better error messages.

* Utility function for receiving `CandidateEvent`s.

* Ordering module typechecks.

* cargo fmt

* Prepare spam slots module.

* Implement SpamSlots mechanism.

* Implement queues.

* cargo fmt

* Participation.

* Participation taking shape.

* Finish participation.

* cargo fmt

* Cleanup.

* WIP: Cleanup + Integration.

* Make `RollingSessionWindow` initialized by default.

* Make approval voting typecheck.

* Get rid of lazy_static & fix approval voting tests

* Move `SessionWindowSize` to node primitives.

* Implement dispute coordinator initialization.

* cargo fmt

* Make queues return error instead of boolean.

* Initialized: WIP

* Introduce chain api for getting finalized block.

* Fix ordering to only prune candidates on finalized events.

* Pruning of old sessions in spam slots.

* New import logic.

* Make everything typecheck.

* Fix warnings.

* Get rid of obsolete dispute-participation.

* Fixes.

* Add back accidentelly deleted Cargo.lock

* Deliver disputes in an ordered fashion.

* Add module docs for errors

* Use type synonym.

* hidden docs.

* Fix overseer tests.

* Ordering provider taking `CandidateReceipt`.

... To be kicked on one next commit.

* Fix ordering to use relay_parent

as included block is not unique per candidate.

* Add comment in ordering.rs.

* Take care of duplicate entries in queues.

* Better spam slots.

* Review remarks + docs.

* Fix db tests.

* Participation tests.

* Also scrape votes on first leaf for good measure.

* Make tests typecheck.

* Spelling.

* Only participate in actual disputes, not on every import.

* Don't account backing votes to spam slots.

* Fix more tests.

* Don't participate if we don't have keys.

* Fix tests, typos and warnings.

* Fix merge error.

* Spelling fixes.

* Add missing docs.

* Queue tests.

* More tests.

* Add metrics + don't short circuit import.

* Basic test for ordering provider.

* Import fix.

* Remove dead link.

* One more dead link.

Co-authored-by: Lldenaurois <Ljdenaurois@gmail.com>
This commit is contained in:
Robert Klotzner
2021-11-19 18:08:21 +01:00
committed by GitHub
parent ef3addb6a2
commit 25974f2076
45 changed files with 4099 additions and 2621 deletions
@@ -19,6 +19,7 @@
//! This is useful for consensus components which need to stay up-to-date about recent sessions but don't
//! care about the state of particular blocks.
pub use polkadot_node_primitives::{new_session_window_size, SessionWindowSize};
use polkadot_primitives::v1::{Hash, SessionIndex, SessionInfo};
use futures::channel::oneshot;
@@ -27,6 +28,7 @@ use polkadot_node_subsystem::{
messages::{RuntimeApiMessage, RuntimeApiRequest},
overseer, SubsystemContext,
};
use thiserror::Error;
/// Sessions unavailable in state to cache.
#[derive(Debug)]
@@ -51,7 +53,7 @@ pub struct SessionsUnavailableInfo {
}
/// Sessions were unavailable to fetch from the state for some reason.
#[derive(Debug)]
#[derive(Debug, Error)]
pub struct SessionsUnavailable {
/// The error kind.
kind: SessionsUnavailableKind,
@@ -59,16 +61,15 @@ pub struct SessionsUnavailable {
info: Option<SessionsUnavailableInfo>,
}
impl core::fmt::Display for SessionsUnavailable {
fn fmt(&self, f: &mut core::fmt::Formatter) -> Result<(), core::fmt::Error> {
write!(f, "Sessions unavailable: {:?}, info: {:?}", self.kind, self.info)
}
}
/// An indicated update of the rolling session window.
#[derive(Debug, PartialEq, Clone)]
pub enum SessionWindowUpdate {
/// The session window was just initialized to the current values.
Initialized {
/// The start of the window (inclusive).
window_start: SessionIndex,
/// The end of the window (inclusive).
window_end: SessionIndex,
},
/// The session window was just advanced from one range to a new one.
Advanced {
/// The previous start of the window (inclusive).
@@ -85,49 +86,63 @@ pub enum SessionWindowUpdate {
}
/// A rolling window of sessions and cached session info.
#[derive(Default)]
pub struct RollingSessionWindow {
earliest_session: Option<SessionIndex>,
earliest_session: SessionIndex,
session_info: Vec<SessionInfo>,
window_size: SessionIndex,
window_size: SessionWindowSize,
}
impl RollingSessionWindow {
/// Initialize a new session info cache with the given window size.
pub fn new(window_size: SessionIndex) -> Self {
RollingSessionWindow { earliest_session: None, session_info: Vec::new(), window_size }
pub async fn new(
ctx: &mut (impl SubsystemContext + overseer::SubsystemContext),
window_size: SessionWindowSize,
block_hash: Hash,
) -> Result<Self, SessionsUnavailable> {
let session_index = get_session_index_for_head(ctx, block_hash).await?;
let window_start = session_index.saturating_sub(window_size.get() - 1);
match load_all_sessions(ctx, block_hash, window_start, session_index).await {
Err(kind) => Err(SessionsUnavailable {
kind,
info: Some(SessionsUnavailableInfo {
window_start,
window_end: session_index,
block_hash,
}),
}),
Ok(s) => Ok(Self { earliest_session: window_start, session_info: s, window_size }),
}
}
/// Initialize a new session info cache with the given window size and
/// initial data.
pub fn with_session_info(
window_size: SessionIndex,
window_size: SessionWindowSize,
earliest_session: SessionIndex,
session_info: Vec<SessionInfo>,
) -> Self {
RollingSessionWindow { earliest_session: Some(earliest_session), session_info, window_size }
RollingSessionWindow { earliest_session, session_info, window_size }
}
/// Access the session info for the given session index, if stored within the window.
pub fn session_info(&self, index: SessionIndex) -> Option<&SessionInfo> {
self.earliest_session.and_then(|earliest| {
if index < earliest {
None
} else {
self.session_info.get((index - earliest) as usize)
}
})
if index < self.earliest_session {
None
} else {
self.session_info.get((index - self.earliest_session) as usize)
}
}
/// Access the index of the earliest session, if the window is not empty.
pub fn earliest_session(&self) -> Option<SessionIndex> {
self.earliest_session.clone()
}
/// Access the index of the latest session, if the window is not empty.
pub fn latest_session(&self) -> Option<SessionIndex> {
/// Access the index of the earliest session.
pub fn earliest_session(&self) -> SessionIndex {
self.earliest_session
.map(|earliest| earliest + (self.session_info.len() as SessionIndex).saturating_sub(1))
}
/// Access the index of the latest session.
pub fn latest_session(&self) -> SessionIndex {
self.earliest_session + (self.session_info.len() as SessionIndex).saturating_sub(1)
}
/// When inspecting a new import notification, updates the session info cache to match
@@ -142,116 +157,86 @@ impl RollingSessionWindow {
ctx: &mut (impl SubsystemContext + overseer::SubsystemContext),
block_hash: Hash,
) -> Result<SessionWindowUpdate, SessionsUnavailable> {
if self.window_size == 0 {
let session_index = get_session_index_for_head(ctx, block_hash).await?;
let old_window_start = self.earliest_session;
let latest = self.latest_session();
// Either cached or ancient.
if session_index <= latest {
return Ok(SessionWindowUpdate::Unchanged)
}
let session_index = {
let (s_tx, s_rx) = oneshot::channel();
let old_window_end = latest;
// We're requesting session index of a child to populate the cache in advance.
ctx.send_message(RuntimeApiMessage::Request(
block_hash,
RuntimeApiRequest::SessionIndexForChild(s_tx),
))
.await;
let window_start = session_index.saturating_sub(self.window_size.get() - 1);
match s_rx.await {
Ok(Ok(s)) => s,
Ok(Err(e)) =>
return Err(SessionsUnavailable {
kind: SessionsUnavailableKind::RuntimeApi(e),
info: None,
}),
Err(e) =>
return Err(SessionsUnavailable {
kind: SessionsUnavailableKind::RuntimeApiUnavailable(e),
info: None,
}),
}
};
// keep some of the old window, if applicable.
let overlap_start = window_start.saturating_sub(old_window_start);
match self.earliest_session {
None => {
// First block processed on start-up.
let fresh_start = if latest < window_start { window_start } else { latest + 1 };
let window_start = session_index.saturating_sub(self.window_size - 1);
match load_all_sessions(ctx, block_hash, fresh_start, session_index).await {
Err(kind) => Err(SessionsUnavailable {
kind,
info: Some(SessionsUnavailableInfo {
window_start: fresh_start,
window_end: session_index,
block_hash,
}),
}),
Ok(s) => {
let update = SessionWindowUpdate::Advanced {
prev_window_start: old_window_start,
prev_window_end: old_window_end,
new_window_start: window_start,
new_window_end: session_index,
};
match load_all_sessions(ctx, block_hash, window_start, session_index).await {
Err(kind) => Err(SessionsUnavailable {
kind,
info: Some(SessionsUnavailableInfo {
window_start,
window_end: session_index,
block_hash,
}),
}),
Ok(s) => {
let update = SessionWindowUpdate::Initialized {
window_start,
window_end: session_index,
};
let outdated = std::cmp::min(overlap_start as usize, self.session_info.len());
self.session_info.drain(..outdated);
self.session_info.extend(s);
// we need to account for this case:
// window_start ................................... session_index
// old_window_start ........... latest
let new_earliest = std::cmp::max(window_start, old_window_start);
self.earliest_session = new_earliest;
self.earliest_session = Some(window_start);
self.session_info = s;
Ok(update)
},
}
},
Some(old_window_start) => {
let latest =
self.latest_session().expect("latest always exists if earliest does; qed");
// Either cached or ancient.
if session_index <= latest {
return Ok(SessionWindowUpdate::Unchanged)
}
let old_window_end = latest;
let window_start = session_index.saturating_sub(self.window_size - 1);
// keep some of the old window, if applicable.
let overlap_start = window_start.saturating_sub(old_window_start);
let fresh_start = if latest < window_start { window_start } else { latest + 1 };
match load_all_sessions(ctx, block_hash, fresh_start, session_index).await {
Err(kind) => Err(SessionsUnavailable {
kind,
info: Some(SessionsUnavailableInfo {
window_start: fresh_start,
window_end: session_index,
block_hash,
}),
}),
Ok(s) => {
let update = SessionWindowUpdate::Advanced {
prev_window_start: old_window_start,
prev_window_end: old_window_end,
new_window_start: window_start,
new_window_end: session_index,
};
let outdated =
std::cmp::min(overlap_start as usize, self.session_info.len());
self.session_info.drain(..outdated);
self.session_info.extend(s);
// we need to account for this case:
// window_start ................................... session_index
// old_window_start ........... latest
let new_earliest = std::cmp::max(window_start, old_window_start);
self.earliest_session = Some(new_earliest);
Ok(update)
},
}
Ok(update)
},
}
}
}
async fn get_session_index_for_head(
ctx: &mut (impl SubsystemContext + overseer::SubsystemContext),
block_hash: Hash,
) -> Result<SessionIndex, SessionsUnavailable> {
let (s_tx, s_rx) = oneshot::channel();
// We're requesting session index of a child to populate the cache in advance.
ctx.send_message(RuntimeApiMessage::Request(
block_hash,
RuntimeApiRequest::SessionIndexForChild(s_tx),
))
.await;
match s_rx.await {
Ok(Ok(s)) => Ok(s),
Ok(Err(e)) =>
return Err(SessionsUnavailable {
kind: SessionsUnavailableKind::RuntimeApi(e),
info: None,
}),
Err(e) =>
return Err(SessionsUnavailable {
kind: SessionsUnavailableKind::RuntimeApiUnavailable(e),
info: None,
}),
}
}
async fn load_all_sessions(
ctx: &mut (impl SubsystemContext + overseer::SubsystemContext),
block_hash: Hash,
@@ -289,7 +274,7 @@ mod tests {
use polkadot_primitives::v1::Header;
use sp_core::testing::TaskExecutor;
const TEST_WINDOW_SIZE: SessionIndex = 6;
pub const TEST_WINDOW_SIZE: SessionWindowSize = new_session_window_size!(6);
fn dummy_session_info(index: SessionIndex) -> SessionInfo {
SessionInfo {
@@ -309,7 +294,7 @@ mod tests {
fn cache_session_info_test(
expected_start_session: SessionIndex,
session: SessionIndex,
mut window: RollingSessionWindow,
window: Option<RollingSessionWindow>,
expect_requests_from: SessionIndex,
) {
let header = Header {
@@ -328,9 +313,15 @@ mod tests {
let test_fut = {
Box::pin(async move {
window.cache_session_info_for_head(&mut ctx, hash).await.unwrap();
assert_eq!(window.earliest_session, Some(expected_start_session));
let window = match window {
None =>
RollingSessionWindow::new(&mut ctx, TEST_WINDOW_SIZE, hash).await.unwrap(),
Some(mut window) => {
window.cache_session_info_for_head(&mut ctx, hash).await.unwrap();
window
},
};
assert_eq!(window.earliest_session, expected_start_session);
assert_eq!(
window.session_info,
(expected_start_session..=session).map(dummy_session_info).collect::<Vec<_>>(),
@@ -370,34 +361,34 @@ mod tests {
#[test]
fn cache_session_info_first_early() {
cache_session_info_test(0, 1, RollingSessionWindow::new(TEST_WINDOW_SIZE), 0);
cache_session_info_test(0, 1, None, 0);
}
#[test]
fn cache_session_info_does_not_underflow() {
let window = RollingSessionWindow {
earliest_session: Some(1),
earliest_session: 1,
session_info: vec![dummy_session_info(1)],
window_size: TEST_WINDOW_SIZE,
};
cache_session_info_test(1, 2, window, 2);
cache_session_info_test(1, 2, Some(window), 2);
}
#[test]
fn cache_session_info_first_late() {
cache_session_info_test(
(100 as SessionIndex).saturating_sub(TEST_WINDOW_SIZE - 1),
(100 as SessionIndex).saturating_sub(TEST_WINDOW_SIZE.get() - 1),
100,
RollingSessionWindow::new(TEST_WINDOW_SIZE),
(100 as SessionIndex).saturating_sub(TEST_WINDOW_SIZE - 1),
None,
(100 as SessionIndex).saturating_sub(TEST_WINDOW_SIZE.get() - 1),
);
}
#[test]
fn cache_session_info_jump() {
let window = RollingSessionWindow {
earliest_session: Some(50),
earliest_session: 50,
session_info: vec![
dummy_session_info(50),
dummy_session_info(51),
@@ -407,43 +398,43 @@ mod tests {
};
cache_session_info_test(
(100 as SessionIndex).saturating_sub(TEST_WINDOW_SIZE - 1),
(100 as SessionIndex).saturating_sub(TEST_WINDOW_SIZE.get() - 1),
100,
window,
(100 as SessionIndex).saturating_sub(TEST_WINDOW_SIZE - 1),
Some(window),
(100 as SessionIndex).saturating_sub(TEST_WINDOW_SIZE.get() - 1),
);
}
#[test]
fn cache_session_info_roll_full() {
let start = 99 - (TEST_WINDOW_SIZE - 1);
let start = 99 - (TEST_WINDOW_SIZE.get() - 1);
let window = RollingSessionWindow {
earliest_session: Some(start),
earliest_session: start,
session_info: (start..=99).map(dummy_session_info).collect(),
window_size: TEST_WINDOW_SIZE,
};
cache_session_info_test(
(100 as SessionIndex).saturating_sub(TEST_WINDOW_SIZE - 1),
(100 as SessionIndex).saturating_sub(TEST_WINDOW_SIZE.get() - 1),
100,
window,
Some(window),
100, // should only make one request.
);
}
#[test]
fn cache_session_info_roll_many_full() {
let start = 97 - (TEST_WINDOW_SIZE - 1);
let start = 97 - (TEST_WINDOW_SIZE.get() - 1);
let window = RollingSessionWindow {
earliest_session: Some(start),
earliest_session: start,
session_info: (start..=97).map(dummy_session_info).collect(),
window_size: TEST_WINDOW_SIZE,
};
cache_session_info_test(
(100 as SessionIndex).saturating_sub(TEST_WINDOW_SIZE - 1),
(100 as SessionIndex).saturating_sub(TEST_WINDOW_SIZE.get() - 1),
100,
window,
Some(window),
98,
);
}
@@ -452,13 +443,16 @@ mod tests {
fn cache_session_info_roll_early() {
let start = 0;
let window = RollingSessionWindow {
earliest_session: Some(start),
earliest_session: start,
session_info: (0..=1).map(dummy_session_info).collect(),
window_size: TEST_WINDOW_SIZE,
};
cache_session_info_test(
0, 2, window, 2, // should only make one request.
0,
2,
Some(window),
2, // should only make one request.
);
}
@@ -466,18 +460,18 @@ mod tests {
fn cache_session_info_roll_many_early() {
let start = 0;
let window = RollingSessionWindow {
earliest_session: Some(start),
earliest_session: start,
session_info: (0..=1).map(dummy_session_info).collect(),
window_size: TEST_WINDOW_SIZE,
};
cache_session_info_test(0, 3, window, 2);
cache_session_info_test(0, 3, Some(window), 2);
}
#[test]
fn any_session_unavailable_for_caching_means_no_change() {
let session: SessionIndex = 6;
let start_session = session.saturating_sub(TEST_WINDOW_SIZE - 1);
let start_session = session.saturating_sub(TEST_WINDOW_SIZE.get() - 1);
let header = Header {
digest: Default::default(),
@@ -490,13 +484,11 @@ mod tests {
let pool = TaskExecutor::new();
let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone());
let mut window = RollingSessionWindow::new(TEST_WINDOW_SIZE);
let hash = header.hash();
let test_fut = {
Box::pin(async move {
let res = window.cache_session_info_for_head(&mut ctx, hash).await;
let res = RollingSessionWindow::new(&mut ctx, TEST_WINDOW_SIZE, hash).await;
assert!(res.is_err());
})
};
@@ -551,14 +543,14 @@ mod tests {
let pool = TaskExecutor::new();
let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone());
let mut window = RollingSessionWindow::new(TEST_WINDOW_SIZE);
let hash = header.hash();
let test_fut = {
Box::pin(async move {
window.cache_session_info_for_head(&mut ctx, hash).await.unwrap();
let window =
RollingSessionWindow::new(&mut ctx, TEST_WINDOW_SIZE, hash).await.unwrap();
assert_eq!(window.earliest_session, Some(session));
assert_eq!(window.earliest_session, session);
assert_eq!(window.session_info, vec![dummy_session_info(session)]);
})
};