Fix cycle dispute-coordinator <-> dispute-distribution (#6489)

* First iteration of message sender.

* dyn Fn variant (no cloning)

* Full implementation + Clone, without allocs on `Send`

* Further clarifications/cleanup.

* MessageSender -> NestingSender

* Doc update/clarification.

* dispute-coordinator: Send disputes on startup.

+ Some fixes, cleanup.

* Fix whitespace.

* Dispute distribution fixes, cleanup.

* Cargo.lock

* Fix spaces.

* More format fixes.

What is cargo fmt doing actually?

* More fmt fixes.

* Fix nesting sender.

* Fixes.

* Whitespace

* Enable logging.

* Guide update.

* Fmt fixes, typos.

* Remove unused function.

* Simplifications, doc fixes.

* Update roadmap/implementers-guide/src/node/disputes/dispute-coordinator.md

Co-authored-by: Marcin S. <marcin@bytedude.com>

* Fmt + doc example fix.

Co-authored-by: eskimor <eskimor@no-such-url.com>
Co-authored-by: Marcin S. <marcin@bytedude.com>
This commit is contained in:
eskimor
2023-01-10 12:04:05 +01:00
committed by GitHub
parent 44fd95661c
commit cc650fe53d
16 changed files with 778 additions and 642 deletions
@@ -31,6 +31,7 @@ assert_matches = "1.4.0"
test-helpers = { package = "polkadot-primitives-test-helpers", path = "../../../primitives/test-helpers" }
futures-timer = "3.0.2"
sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" }
[features]
# If not enabled, the dispute coordinator will do nothing.
@@ -87,60 +87,69 @@ impl<'a> CandidateEnvironment<'a> {
/// Whether or not we already issued some statement about a candidate.
pub enum OwnVoteState {
/// We already voted/issued a statement for the candidate.
Voted,
/// We already voted/issued a statement for the candidate and it was an approval vote.
/// Our votes, if any.
Voted(Vec<(ValidatorIndex, (DisputeStatement, ValidatorSignature))>),
/// We are not a parachain validator in the session.
///
/// Needs special treatment as we have to make sure to propagate it to peers, to guarantee the
/// dispute can conclude.
VotedApproval(Vec<(ValidatorIndex, ValidatorSignature)>),
/// We not yet voted for the dispute.
NoVote,
/// Hence we cannot vote.
CannotVote,
}
impl OwnVoteState {
fn new<'a>(votes: &CandidateVotes, env: &CandidateEnvironment<'a>) -> Self {
let mut our_valid_votes = env
.controlled_indices()
let controlled_indices = env.controlled_indices();
if controlled_indices.is_empty() {
return Self::CannotVote
}
let our_valid_votes = controlled_indices
.iter()
.filter_map(|i| votes.valid.raw().get_key_value(i))
.peekable();
let mut our_invalid_votes =
env.controlled_indices.iter().filter_map(|i| votes.invalid.get_key_value(i));
let has_valid_votes = our_valid_votes.peek().is_some();
let has_invalid_votes = our_invalid_votes.next().is_some();
let our_approval_votes: Vec<_> = our_valid_votes
.filter_map(|(index, (k, sig))| {
if let ValidDisputeStatementKind::ApprovalChecking = k {
Some((*index, sig.clone()))
} else {
None
}
})
.collect();
.map(|(index, (kind, sig))| (*index, (DisputeStatement::Valid(*kind), sig.clone())));
let our_invalid_votes = controlled_indices
.iter()
.filter_map(|i| votes.invalid.get_key_value(i))
.map(|(index, (kind, sig))| (*index, (DisputeStatement::Invalid(*kind), sig.clone())));
if !our_approval_votes.is_empty() {
return Self::VotedApproval(our_approval_votes)
}
if has_valid_votes || has_invalid_votes {
return Self::Voted
}
Self::NoVote
Self::Voted(our_valid_votes.chain(our_invalid_votes).collect())
}
/// Whether or not we issued a statement for the candidate already.
fn voted(&self) -> bool {
/// Is a vote from us missing but we are a validator able to vote?
fn vote_missing(&self) -> bool {
match self {
Self::Voted | Self::VotedApproval(_) => true,
Self::NoVote => false,
Self::Voted(votes) if votes.is_empty() => true,
Self::Voted(_) | Self::CannotVote => false,
}
}
/// Get own approval votes, if any.
fn approval_votes(&self) -> Option<&Vec<(ValidatorIndex, ValidatorSignature)>> {
///
/// Empty iterator means, no approval votes. `None` means, there will never be any (we cannot
/// vote).
fn approval_votes(
&self,
) -> Option<impl Iterator<Item = (ValidatorIndex, &ValidatorSignature)>> {
match self {
Self::VotedApproval(votes) => Some(&votes),
_ => None,
Self::Voted(votes) => Some(votes.iter().filter_map(|(index, (kind, sig))| {
if let DisputeStatement::Valid(ValidDisputeStatementKind::ApprovalChecking) = kind {
Some((*index, sig))
} else {
None
}
})),
Self::CannotVote => None,
}
}
/// Get our votes if there are any.
///
/// Empty iterator means, no votes. `None` means, there will never be any (we cannot
/// vote).
fn votes(&self) -> Option<&Vec<(ValidatorIndex, (DisputeStatement, ValidatorSignature))>> {
match self {
Self::Voted(votes) => Some(&votes),
Self::CannotVote => None,
}
}
}
@@ -170,7 +179,7 @@ impl CandidateVoteState<CandidateVotes> {
valid: ValidCandidateVotes::new(),
invalid: BTreeMap::new(),
};
Self { votes, own_vote: OwnVoteState::NoVote, dispute_status: None }
Self { votes, own_vote: OwnVoteState::CannotVote, dispute_status: None }
}
/// Create a new `CandidateVoteState` from already existing votes.
@@ -327,16 +336,25 @@ impl<V> CandidateVoteState<V> {
self.dispute_status.map_or(false, |s| s.is_confirmed_concluded())
}
/// This machine already cast some vote in that dispute/for that candidate.
pub fn has_own_vote(&self) -> bool {
self.own_vote.voted()
/// Are we a validator in the session, but have not yet voted?
pub fn own_vote_missing(&self) -> bool {
self.own_vote.vote_missing()
}
/// Own approval votes if any:
pub fn own_approval_votes(&self) -> Option<&Vec<(ValidatorIndex, ValidatorSignature)>> {
pub fn own_approval_votes(
&self,
) -> Option<impl Iterator<Item = (ValidatorIndex, &ValidatorSignature)>> {
self.own_vote.approval_votes()
}
/// Get own votes if there are any.
pub fn own_votes(
&self,
) -> Option<&Vec<(ValidatorIndex, (DisputeStatement, ValidatorSignature))>> {
self.own_vote.votes()
}
/// Whether or not there is a dispute and it has already enough valid votes to conclude.
pub fn has_concluded_for(&self) -> bool {
self.dispute_status.map_or(false, |s| s.has_concluded_for())
@@ -26,8 +26,7 @@ use futures::{
use sc_keystore::LocalKeystore;
use polkadot_node_primitives::{
disputes::ValidCandidateVotes, CandidateVotes, DisputeMessage, DisputeMessageCheckError,
DisputeStatus, SignedDisputeStatement, Timestamp,
disputes::ValidCandidateVotes, CandidateVotes, DisputeStatus, SignedDisputeStatement, Timestamp,
};
use polkadot_node_subsystem::{
messages::{
@@ -48,6 +47,7 @@ use polkadot_primitives::v2::{
use crate::{
error::{log_error, Error, FatalError, FatalResult, JfyiError, JfyiResult, Result},
import::{CandidateEnvironment, CandidateVoteState},
is_potential_spam,
metrics::Metrics,
status::{get_active_with_status, Clock},
DisputeCoordinatorSubsystem, LOG_TARGET,
@@ -55,7 +55,7 @@ use crate::{
use super::{
backend::Backend,
db,
db, make_dispute_message,
participation::{
self, Participation, ParticipationPriority, ParticipationRequest, ParticipationStatement,
WorkerMessageReceiver,
@@ -396,19 +396,19 @@ impl Initialized {
CompactStatement::Valid(_) =>
ValidDisputeStatementKind::BackingValid(relay_parent),
};
debug_assert!(
SignedDisputeStatement::new_checked(
debug_assert!(
SignedDisputeStatement::new_checked(
DisputeStatement::Valid(valid_statement_kind),
candidate_hash,
session,
validator_public.clone(),
validator_signature.clone(),
).is_ok(),
"Scraped backing votes had invalid signature! candidate: {:?}, session: {:?}, validator_public: {:?}",
candidate_hash,
session,
validator_public,
);
).is_ok(),
"Scraped backing votes had invalid signature! candidate: {:?}, session: {:?}, validator_public: {:?}",
candidate_hash,
session,
validator_public,
);
let signed_dispute_statement =
SignedDisputeStatement::new_unchecked_from_trusted_source(
DisputeStatement::Valid(valid_statement_kind),
@@ -492,20 +492,20 @@ impl Initialized {
})
.cloned()?;
debug_assert!(
SignedDisputeStatement::new_checked(
debug_assert!(
SignedDisputeStatement::new_checked(
dispute_statement.clone(),
candidate_hash,
session,
validator_public.clone(),
validator_signature.clone(),
).is_ok(),
"Scraped dispute votes had invalid signature! candidate: {:?}, session: {:?}, dispute_statement: {:?}, validator_public: {:?}",
candidate_hash,
session,
).is_ok(),
"Scraped dispute votes had invalid signature! candidate: {:?}, session: {:?}, dispute_statement: {:?}, validator_public: {:?}",
candidate_hash,
session,
dispute_statement,
validator_public,
);
validator_public,
);
Some((
SignedDisputeStatement::new_unchecked_from_trusted_source(
@@ -845,18 +845,16 @@ impl Initialized {
let is_included = self.scraper.is_candidate_included(&candidate_hash);
let is_backed = self.scraper.is_candidate_backed(&candidate_hash);
let has_own_vote = new_state.has_own_vote();
let own_vote_missing = new_state.own_vote_missing();
let is_disputed = new_state.is_disputed();
let has_controlled_indices = !env.controlled_indices().is_empty();
let is_confirmed = new_state.is_confirmed();
let potential_spam =
!is_included && !is_backed && !new_state.is_confirmed() && !new_state.has_own_vote();
// We participate only in disputes which are included, backed or confirmed
let allow_participation = is_included || is_backed || is_confirmed;
let potential_spam = is_potential_spam(&self.scraper, &new_state, &candidate_hash);
// We participate only in disputes which are not potential spam.
let allow_participation = !potential_spam;
gum::trace!(
target: LOG_TARGET,
has_own_vote = ?new_state.has_own_vote(),
?own_vote_missing,
?potential_spam,
?is_included,
?candidate_hash,
@@ -903,7 +901,7 @@ impl Initialized {
// - `is_included` lands in prioritised queue
// - `is_confirmed` | `is_backed` lands in best effort queue
// We don't participate in disputes on finalized candidates.
if !has_own_vote && is_disputed && has_controlled_indices && allow_participation {
if own_vote_missing && is_disputed && allow_participation {
let priority = ParticipationPriority::with_priority_if(is_included);
gum::trace!(
target: LOG_TARGET,
@@ -930,9 +928,8 @@ impl Initialized {
target: LOG_TARGET,
?candidate_hash,
?is_confirmed,
?has_own_vote,
?own_vote_missing,
?is_disputed,
?has_controlled_indices,
?allow_participation,
?is_included,
?is_backed,
@@ -946,10 +943,9 @@ impl Initialized {
// Also send any already existing approval vote on new disputes:
if import_result.is_freshly_disputed() {
let no_votes = Vec::new();
let our_approval_votes = new_state.own_approval_votes().unwrap_or(&no_votes);
let our_approval_votes = new_state.own_approval_votes().into_iter().flatten();
for (validator_index, sig) in our_approval_votes {
let pub_key = match env.validators().get(*validator_index) {
let pub_key = match env.validators().get(validator_index) {
None => {
gum::error!(
target: LOG_TARGET,
@@ -979,7 +975,7 @@ impl Initialized {
env.session_info(),
&new_state.votes(),
statement,
*validator_index,
validator_index,
) {
Err(err) => {
gum::error!(
@@ -1150,9 +1146,9 @@ impl Initialized {
Ok(None) => {},
Err(e) => {
gum::error!(
target: LOG_TARGET,
err = ?e,
"Encountered keystore error while signing dispute statement",
target: LOG_TARGET,
err = ?e,
"Encountered keystore error while signing dispute statement",
);
},
}
@@ -1251,74 +1247,6 @@ impl MaybeCandidateReceipt {
}
}
#[derive(Debug, thiserror::Error)]
enum DisputeMessageCreationError {
#[error("There was no opposite vote available")]
NoOppositeVote,
#[error("Found vote had an invalid validator index that could not be found")]
InvalidValidatorIndex,
#[error("Statement found in votes had invalid signature.")]
InvalidStoredStatement,
#[error(transparent)]
InvalidStatementCombination(DisputeMessageCheckError),
}
fn make_dispute_message(
info: &SessionInfo,
votes: &CandidateVotes,
our_vote: SignedDisputeStatement,
our_index: ValidatorIndex,
) -> std::result::Result<DisputeMessage, DisputeMessageCreationError> {
let validators = &info.validators;
let (valid_statement, valid_index, invalid_statement, invalid_index) =
if let DisputeStatement::Valid(_) = our_vote.statement() {
let (validator_index, (statement_kind, validator_signature)) =
votes.invalid.iter().next().ok_or(DisputeMessageCreationError::NoOppositeVote)?;
let other_vote = SignedDisputeStatement::new_checked(
DisputeStatement::Invalid(*statement_kind),
*our_vote.candidate_hash(),
our_vote.session_index(),
validators
.get(*validator_index)
.ok_or(DisputeMessageCreationError::InvalidValidatorIndex)?
.clone(),
validator_signature.clone(),
)
.map_err(|()| DisputeMessageCreationError::InvalidStoredStatement)?;
(our_vote, our_index, other_vote, *validator_index)
} else {
let (validator_index, (statement_kind, validator_signature)) = votes
.valid
.raw()
.iter()
.next()
.ok_or(DisputeMessageCreationError::NoOppositeVote)?;
let other_vote = SignedDisputeStatement::new_checked(
DisputeStatement::Valid(*statement_kind),
*our_vote.candidate_hash(),
our_vote.session_index(),
validators
.get(*validator_index)
.ok_or(DisputeMessageCreationError::InvalidValidatorIndex)?
.clone(),
validator_signature.clone(),
)
.map_err(|()| DisputeMessageCreationError::InvalidStoredStatement)?;
(other_vote, *validator_index, our_vote, our_index)
};
DisputeMessage::from_signed_statements(
valid_statement,
valid_index,
invalid_statement,
invalid_index,
votes.candidate_receipt.clone(),
info,
)
.map_err(DisputeMessageCreationError::InvalidStatementCombination)
}
/// Determine the best block and its block number.
/// Assumes `block_descriptions` are sorted from the one
/// with the lowest `BlockNumber` to the highest.
+211 -56
View File
@@ -28,17 +28,21 @@ use std::sync::Arc;
use futures::FutureExt;
use gum::CandidateHash;
use sc_keystore::LocalKeystore;
use polkadot_node_primitives::CandidateVotes;
use polkadot_node_primitives::{
CandidateVotes, DisputeMessage, DisputeMessageCheckError, SignedDisputeStatement,
};
use polkadot_node_subsystem::{
overseer, ActivatedLeaf, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError,
messages::DisputeDistributionMessage, overseer, ActivatedLeaf, FromOrchestra, OverseerSignal,
SpawnedSubsystem, SubsystemError,
};
use polkadot_node_subsystem_util::{
database::Database,
rolling_session_window::{DatabaseParams, RollingSessionWindow},
};
use polkadot_primitives::v2::{ScrapedOnChainVotes, ValidatorIndex, ValidatorPair};
use polkadot_primitives::v2::{DisputeStatement, ScrapedOnChainVotes, SessionInfo, ValidatorIndex};
use crate::{
error::{FatalResult, JfyiError, Result},
@@ -50,6 +54,7 @@ use db::v1::DbBackend;
use fatality::Split;
use self::{
import::{CandidateEnvironment, CandidateVoteState},
participation::{ParticipationPriority, ParticipationRequest},
spam_slots::{SpamSlots, UnconfirmedDisputes},
};
@@ -274,10 +279,13 @@ impl DisputeCoordinatorSubsystem {
// Prune obsolete disputes:
db::v1::note_earliest_session(overlay_db, rolling_session_window.earliest_session())?;
let now = clock.now();
let active_disputes = match overlay_db.load_recent_disputes() {
Ok(Some(disputes)) =>
get_active_with_status(disputes.into_iter(), clock.now()).collect(),
Ok(None) => Vec::new(),
Ok(disputes) => disputes
.map(|disputes| get_active_with_status(disputes.into_iter(), now))
.into_iter()
.flatten(),
Err(e) => {
gum::error!(target: LOG_TARGET, "Failed initial load of recent disputes: {:?}", e);
return Err(e.into())
@@ -285,9 +293,23 @@ impl DisputeCoordinatorSubsystem {
};
let mut participation_requests = Vec::new();
let mut unconfirmed_disputes: UnconfirmedDisputes = UnconfirmedDisputes::new();
let mut spam_disputes: UnconfirmedDisputes = UnconfirmedDisputes::new();
let (scraper, votes) = ChainScraper::new(ctx.sender(), initial_head).await?;
for ((session, ref candidate_hash), status) in active_disputes {
for ((session, ref candidate_hash), _) in active_disputes {
let env =
match CandidateEnvironment::new(&self.keystore, &rolling_session_window, session) {
None => {
gum::warn!(
target: LOG_TARGET,
session,
"We are lacking a `SessionInfo` for handling db votes on startup."
);
continue
},
Some(env) => env,
};
let votes: CandidateVotes =
match overlay_db.load_candidate_votes(session, candidate_hash) {
Ok(Some(votes)) => votes.into(),
@@ -301,60 +323,52 @@ impl DisputeCoordinatorSubsystem {
continue
},
};
let vote_state = CandidateVoteState::new(votes, &env, now);
let validators = match rolling_session_window.session_info(session) {
None => {
gum::warn!(
let potential_spam = is_potential_spam(&scraper, &vote_state, candidate_hash);
let is_included =
scraper.is_candidate_included(&vote_state.votes().candidate_receipt.hash());
if potential_spam {
gum::trace!(
target: LOG_TARGET,
?session,
?candidate_hash,
"Found potential spam dispute on startup"
);
spam_disputes
.insert((session, *candidate_hash), vote_state.votes().voted_indices());
} else {
// Participate if need be:
if vote_state.own_vote_missing() {
gum::trace!(
target: LOG_TARGET,
session,
"Missing info for session which has an active dispute",
?session,
?candidate_hash,
"Found valid dispute, with no vote from us on startup - participating."
);
continue
},
Some(info) => info.validators.clone(),
};
let voted_indices = votes.voted_indices();
// Determine if there are any missing local statements for this dispute. Validators are
// filtered if:
// 1) their statement already exists, or
// 2) the validator key is not in the local keystore (i.e. the validator is remote).
// The remaining set only contains local validators that are also missing statements.
let missing_local_statement = validators
.iter()
.enumerate()
.map(|(index, validator)| (ValidatorIndex(index as _), validator))
.any(|(index, validator)| {
!voted_indices.contains(&index) &&
self.keystore
.key_pair::<ValidatorPair>(validator)
.ok()
.map_or(false, |v| v.is_some())
});
let is_included = scraper.is_candidate_included(&votes.candidate_receipt.hash());
if !status.is_confirmed_concluded() && !is_included {
unconfirmed_disputes.insert((session, *candidate_hash), voted_indices);
}
// Participate for all non-concluded disputes which do not have a
// recorded local statement.
if missing_local_statement {
participation_requests.push((
ParticipationPriority::with_priority_if(is_included),
ParticipationRequest::new(votes.candidate_receipt.clone(), session),
));
participation_requests.push((
ParticipationPriority::with_priority_if(is_included),
ParticipationRequest::new(
vote_state.votes().candidate_receipt.clone(),
session,
),
));
}
// Else make sure our own vote is distributed:
else {
gum::trace!(
target: LOG_TARGET,
?session,
?candidate_hash,
"Found valid dispute, with vote from us on startup - send vote."
);
send_dispute_messages(ctx, &env, &vote_state).await;
}
}
}
Ok((
participation_requests,
votes,
SpamSlots::recover_from_state(unconfirmed_disputes),
scraper,
))
Ok((participation_requests, votes, SpamSlots::recover_from_state(spam_disputes), scraper))
}
}
@@ -407,3 +421,144 @@ async fn wait_for_first_leaf<Context>(ctx: &mut Context) -> Result<Option<Activa
}
}
}
/// Check wheter a dispute for the given candidate could be spam.
///
/// That is the candidate could be made up.
pub fn is_potential_spam<V>(
scraper: &ChainScraper,
vote_state: &CandidateVoteState<V>,
candidate_hash: &CandidateHash,
) -> bool {
let is_disputed = vote_state.is_disputed();
let is_included = scraper.is_candidate_included(candidate_hash);
let is_backed = scraper.is_candidate_backed(candidate_hash);
let is_confirmed = vote_state.is_confirmed();
is_disputed && !is_included && !is_backed && !is_confirmed
}
/// Tell dispute-distribution to send all our votes.
///
/// Should be called on startup for all active disputes where there are votes from us already.
#[overseer::contextbounds(DisputeCoordinator, prefix = self::overseer)]
async fn send_dispute_messages<Context>(
ctx: &mut Context,
env: &CandidateEnvironment<'_>,
vote_state: &CandidateVoteState<CandidateVotes>,
) {
for own_vote in vote_state.own_votes().into_iter().flatten() {
let (validator_index, (kind, sig)) = own_vote;
let public_key = if let Some(key) = env.session_info().validators.get(*validator_index) {
key.clone()
} else {
gum::error!(
target: LOG_TARGET,
?validator_index,
session_index = ?env.session_index(),
"Could not find our own key in `SessionInfo`"
);
continue
};
let our_vote_signed = SignedDisputeStatement::new_checked(
kind.clone(),
vote_state.votes().candidate_receipt.hash(),
env.session_index(),
public_key,
sig.clone(),
);
let our_vote_signed = match our_vote_signed {
Ok(signed) => signed,
Err(()) => {
gum::error!(
target: LOG_TARGET,
"Checking our own signature failed - db corruption?"
);
continue
},
};
let dispute_message = match make_dispute_message(
env.session_info(),
vote_state.votes(),
our_vote_signed,
*validator_index,
) {
Err(err) => {
gum::debug!(target: LOG_TARGET, ?err, "Creating dispute message failed.");
continue
},
Ok(dispute_message) => dispute_message,
};
ctx.send_message(DisputeDistributionMessage::SendDispute(dispute_message)).await;
}
}
#[derive(Debug, thiserror::Error)]
pub enum DisputeMessageCreationError {
#[error("There was no opposite vote available")]
NoOppositeVote,
#[error("Found vote had an invalid validator index that could not be found")]
InvalidValidatorIndex,
#[error("Statement found in votes had invalid signature.")]
InvalidStoredStatement,
#[error(transparent)]
InvalidStatementCombination(DisputeMessageCheckError),
}
/// Create a `DisputeMessage` to be sent to `DisputeDistribution`.
pub fn make_dispute_message(
info: &SessionInfo,
votes: &CandidateVotes,
our_vote: SignedDisputeStatement,
our_index: ValidatorIndex,
) -> std::result::Result<DisputeMessage, DisputeMessageCreationError> {
let validators = &info.validators;
let (valid_statement, valid_index, invalid_statement, invalid_index) =
if let DisputeStatement::Valid(_) = our_vote.statement() {
let (validator_index, (statement_kind, validator_signature)) =
votes.invalid.iter().next().ok_or(DisputeMessageCreationError::NoOppositeVote)?;
let other_vote = SignedDisputeStatement::new_checked(
DisputeStatement::Invalid(*statement_kind),
*our_vote.candidate_hash(),
our_vote.session_index(),
validators
.get(*validator_index)
.ok_or(DisputeMessageCreationError::InvalidValidatorIndex)?
.clone(),
validator_signature.clone(),
)
.map_err(|()| DisputeMessageCreationError::InvalidStoredStatement)?;
(our_vote, our_index, other_vote, *validator_index)
} else {
let (validator_index, (statement_kind, validator_signature)) = votes
.valid
.raw()
.iter()
.next()
.ok_or(DisputeMessageCreationError::NoOppositeVote)?;
let other_vote = SignedDisputeStatement::new_checked(
DisputeStatement::Valid(*statement_kind),
*our_vote.candidate_hash(),
our_vote.session_index(),
validators
.get(*validator_index)
.ok_or(DisputeMessageCreationError::InvalidValidatorIndex)?
.clone(),
validator_signature.clone(),
)
.map_err(|()| DisputeMessageCreationError::InvalidStoredStatement)?;
(other_vote, *validator_index, our_vote, our_index)
};
DisputeMessage::from_signed_statements(
valid_statement,
valid_index,
invalid_statement,
invalid_index,
votes.candidate_receipt.clone(),
info,
)
.map_err(DisputeMessageCreationError::InvalidStatementCombination)
}
@@ -32,7 +32,7 @@ use futures::{
use polkadot_node_subsystem_util::database::Database;
use polkadot_node_primitives::{
DisputeStatus, SignedDisputeStatement, SignedFullStatement, Statement,
DisputeMessage, DisputeStatus, SignedDisputeStatement, SignedFullStatement, Statement,
};
use polkadot_node_subsystem::{
messages::{
@@ -291,6 +291,7 @@ impl TestState {
.await;
}
/// Returns any sent `DisputeMessage`s.
async fn handle_sync_queries(
&mut self,
virtual_overseer: &mut VirtualOverseer,
@@ -298,7 +299,7 @@ impl TestState {
block_number: BlockNumber,
session: SessionIndex,
candidate_events: Vec<CandidateEvent>,
) {
) -> Vec<DisputeMessage> {
// Order of messages is not fixed (different on initializing):
#[derive(Debug)]
struct FinishedSteps {
@@ -316,6 +317,7 @@ impl TestState {
}
let mut finished_steps = FinishedSteps::new();
let mut sent_disputes = Vec::new();
while !finished_steps.is_done() {
let recv = overseer_recv(virtual_overseer).await;
@@ -400,6 +402,9 @@ impl TestState {
let block_num = self.headers.get(&hash).map(|header| header.number);
tx.send(Ok(block_num)).unwrap();
},
AllMessages::DisputeDistribution(DisputeDistributionMessage::SendDispute(msg)) => {
sent_disputes.push(msg);
},
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
_new_leaf,
RuntimeApiRequest::CandidateEvents(tx),
@@ -439,14 +444,25 @@ impl TestState {
},
}
}
return sent_disputes
}
async fn handle_resume_sync(
&mut self,
virtual_overseer: &mut VirtualOverseer,
session: SessionIndex,
) {
) -> Vec<DisputeMessage> {
self.handle_resume_sync_with_events(virtual_overseer, session, Vec::new()).await
}
async fn handle_resume_sync_with_events(
&mut self,
virtual_overseer: &mut VirtualOverseer,
session: SessionIndex,
mut initial_events: Vec<CandidateEvent>,
) -> Vec<DisputeMessage> {
let leaves: Vec<Hash> = self.headers.keys().cloned().collect();
let mut messages = Vec::new();
for (n, leaf) in leaves.iter().enumerate() {
gum::debug!(
block_number= ?n,
@@ -463,15 +479,14 @@ impl TestState {
)))
.await;
self.handle_sync_queries(
virtual_overseer,
*leaf,
n as BlockNumber,
session,
Vec::new(),
)
.await;
let events = if n == 1 { std::mem::take(&mut initial_events) } else { Vec::new() };
let mut new_messages = self
.handle_sync_queries(virtual_overseer, *leaf, n as BlockNumber, session, events)
.await;
messages.append(&mut new_messages);
}
messages
}
fn session_info(&self) -> SessionInfo {
@@ -2148,6 +2163,7 @@ fn concluded_supermajority_against_non_active_after_time() {
#[test]
fn resume_dispute_without_local_statement() {
sp_tracing::init_for_tests();
let session = 1;
test_harness(|mut test_state, mut virtual_overseer| {
@@ -2188,10 +2204,8 @@ fn resume_dispute_without_local_statement() {
handle_approval_vote_request(&mut virtual_overseer, &candidate_hash, HashMap::new())
.await;
// Missing availability -> No local vote.
// Participation won't happen here because the dispute is neither backed, not confirmed
// nor the candidate is included. Or in other words - we'll refrain from participation.
assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport));
{
@@ -2216,7 +2230,17 @@ fn resume_dispute_without_local_statement() {
// local statement for the active dispute.
.resume(|mut test_state, mut virtual_overseer| {
Box::pin(async move {
test_state.handle_resume_sync(&mut virtual_overseer, session).await;
let candidate_receipt = make_valid_candidate_receipt();
// Candidate is now backed:
let dispute_messages = test_state
.handle_resume_sync_with_events(
&mut virtual_overseer,
session,
vec![make_candidate_backed_event(candidate_receipt.clone())],
)
.await;
assert_eq!(dispute_messages.len(), 0, "We don't expect any messages sent here.");
let candidate_receipt = make_valid_candidate_receipt();
let candidate_hash = candidate_receipt.hash();
@@ -2282,6 +2306,7 @@ fn resume_dispute_without_local_statement() {
#[test]
fn resume_dispute_with_local_statement() {
sp_tracing::init_for_tests();
let session = 1;
test_harness(|mut test_state, mut virtual_overseer| {
@@ -2359,10 +2384,19 @@ fn resume_dispute_with_local_statement() {
})
})
// Alice should not send a DisputeParticiationMessage::Participate on restart since she has a
// local statement for the active dispute.
// local statement for the active dispute, instead she should try to (re-)send her vote.
.resume(|mut test_state, mut virtual_overseer| {
let candidate_receipt = make_valid_candidate_receipt();
Box::pin(async move {
test_state.handle_resume_sync(&mut virtual_overseer, session).await;
let messages = test_state
.handle_resume_sync_with_events(
&mut virtual_overseer,
session,
vec![make_candidate_backed_event(candidate_receipt.clone())],
)
.await;
assert_eq!(messages.len(), 1, "A message should have gone out.");
// Assert that subsystem is not sending Participation messages because we issued a local statement
assert!(virtual_overseer.recv().timeout(TEST_TIMEOUT).await.is_none());
@@ -2390,7 +2424,12 @@ fn resume_dispute_without_local_statement_or_local_key() {
let candidate_hash = candidate_receipt.hash();
test_state
.activate_leaf_at_session(&mut virtual_overseer, session, 1, Vec::new())
.activate_leaf_at_session(
&mut virtual_overseer,
session,
1,
vec![make_candidate_included_event(candidate_receipt.clone())],
)
.await;
let (valid_vote, invalid_vote) = generate_opposing_votes_pair(
@@ -2464,101 +2503,6 @@ fn resume_dispute_without_local_statement_or_local_key() {
});
}
#[test]
fn resume_dispute_with_local_statement_without_local_key() {
let session = 1;
let test_state = TestState::default();
let mut test_state = test_state.resume(|mut test_state, mut virtual_overseer| {
Box::pin(async move {
test_state.handle_resume_sync(&mut virtual_overseer, session).await;
let candidate_receipt = make_valid_candidate_receipt();
let candidate_hash = candidate_receipt.hash();
test_state
.activate_leaf_at_session(&mut virtual_overseer, session, 1, Vec::new())
.await;
let local_valid_vote = test_state
.issue_explicit_statement_with_index(
ValidatorIndex(0),
candidate_hash,
session,
true,
)
.await;
let (valid_vote, invalid_vote) = generate_opposing_votes_pair(
&test_state,
ValidatorIndex(1),
ValidatorIndex(2),
candidate_hash,
session,
VoteType::Explicit,
)
.await;
let (pending_confirmation, confirmation_rx) = oneshot::channel();
virtual_overseer
.send(FromOrchestra::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
candidate_receipt: candidate_receipt.clone(),
session,
statements: vec![
(local_valid_vote, ValidatorIndex(0)),
(valid_vote, ValidatorIndex(1)),
(invalid_vote, ValidatorIndex(2)),
],
pending_confirmation: Some(pending_confirmation),
},
})
.await;
handle_approval_vote_request(&mut virtual_overseer, &candidate_hash, HashMap::new())
.await;
assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport));
{
let (tx, rx) = oneshot::channel();
virtual_overseer
.send(FromOrchestra::Communication {
msg: DisputeCoordinatorMessage::ActiveDisputes(tx),
})
.await;
assert_eq!(rx.await.unwrap().len(), 1);
}
virtual_overseer.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await;
assert!(virtual_overseer.try_recv().await.is_none());
test_state
})
});
// No keys:
test_state.subsystem_keystore =
make_keystore(vec![Sr25519Keyring::Two.to_seed()].into_iter()).into();
// Two should not send a DisputeParticiationMessage::Participate on restart since we gave
// her a non existing key.
test_state.resume(|mut test_state, mut virtual_overseer| {
Box::pin(async move {
test_state.handle_resume_sync(&mut virtual_overseer, session).await;
// Assert that subsystem is not sending Participation messages because we don't
// have a key.
assert!(virtual_overseer.recv().timeout(TEST_TIMEOUT).await.is_none());
virtual_overseer.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await;
assert!(virtual_overseer.try_recv().await.is_none());
test_state
})
});
}
#[test]
fn issue_valid_local_statement_does_cause_distribution_but_not_duplicate_participation() {
issue_local_statement_does_cause_distribution_but_not_duplicate_participation(true);
@@ -29,6 +29,7 @@ use std::{num::NonZeroUsize, time::Duration};
use futures::{channel::mpsc, FutureExt, StreamExt, TryFutureExt};
use polkadot_node_network_protocol::authority_discovery::AuthorityDiscovery;
use polkadot_node_subsystem_util::nesting_sender::NestingSender;
use sp_keystore::SyncCryptoStorePtr;
use polkadot_node_network_protocol::request_response::{incoming::IncomingRequestReceiver, v1};
@@ -51,33 +52,33 @@ use polkadot_node_subsystem_util::{runtime, runtime::RuntimeInfo};
/// to this subsystem, unknown dispute. This is to make sure, we get our vote out, even on
/// restarts.
///
/// The actual work of sending and keeping track of transmission attempts to each validator for a
/// particular dispute are done by [`SendTask`]. The purpose of the `DisputeSender` is to keep
/// track of all ongoing disputes and start and clean up `SendTask`s accordingly.
/// The actual work of sending and keeping track of transmission attempts to each validator for a
/// particular dispute are done by [`SendTask`]. The purpose of the `DisputeSender` is to keep
/// track of all ongoing disputes and start and clean up `SendTask`s accordingly.
mod sender;
use self::sender::{DisputeSender, TaskFinish};
use self::sender::{DisputeSender, DisputeSenderMessage};
/// ## The receiver [`DisputesReceiver`]
/// ## The receiver [`DisputesReceiver`]
///
/// The receiving side is implemented as `DisputesReceiver` and is run as a separate long running task within
/// this subsystem ([`DisputesReceiver::run`]).
/// The receiving side is implemented as `DisputesReceiver` and is run as a separate long running task within
/// this subsystem ([`DisputesReceiver::run`]).
///
/// Conceptually all the receiver has to do, is waiting for incoming requests which are passed in
/// via a dedicated channel and forwarding them to the dispute coordinator via
/// `DisputeCoordinatorMessage::ImportStatements`. Being the interface to the network and untrusted
/// nodes, the reality is not that simple of course. Before importing statements the receiver will
/// batch up imports as well as possible for efficient imports while maintaining timely dispute
/// resolution and handling of spamming validators:
/// Conceptually all the receiver has to do, is waiting for incoming requests which are passed in
/// via a dedicated channel and forwarding them to the dispute coordinator via
/// `DisputeCoordinatorMessage::ImportStatements`. Being the interface to the network and untrusted
/// nodes, the reality is not that simple of course. Before importing statements the receiver will
/// batch up imports as well as possible for efficient imports while maintaining timely dispute
/// resolution and handling of spamming validators:
///
/// - Drop all messages from non validator nodes, for this it requires the [`AuthorityDiscovery`]
/// service.
/// - Drop messages from a node, if it sends at a too high rate.
/// - Filter out duplicate messages (over some period of time).
/// - Drop any obviously invalid votes (invalid signatures for example).
/// - Ban peers whose votes were deemed invalid.
/// - Drop all messages from non validator nodes, for this it requires the [`AuthorityDiscovery`]
/// service.
/// - Drop messages from a node, if it sends at a too high rate.
/// - Filter out duplicate messages (over some period of time).
/// - Drop any obviously invalid votes (invalid signatures for example).
/// - Ban peers whose votes were deemed invalid.
///
/// In general dispute-distribution works on limiting the work the dispute-coordinator will have to
/// do, while at the same time making it aware of new disputes as fast as possible.
/// In general dispute-distribution works on limiting the work the dispute-coordinator will have to
/// do, while at the same time making it aware of new disputes as fast as possible.
///
/// For successfully imported votes, we will confirm the receipt of the message back to the sender.
/// This way a received confirmation guarantees, that the vote has been stored to disk by the
@@ -87,7 +88,7 @@ use self::receiver::DisputesReceiver;
/// Error and [`Result`] type for this subsystem.
mod error;
use error::{log_error, FatalError, FatalResult, Result};
use error::{log_error, Error, FatalError, FatalResult, Result};
#[cfg(test)]
mod tests;
@@ -118,10 +119,10 @@ pub struct DisputeDistributionSubsystem<AD> {
runtime: RuntimeInfo,
/// Sender for our dispute requests.
disputes_sender: DisputeSender,
disputes_sender: DisputeSender<DisputeSenderMessage>,
/// Receive messages from `SendTask`.
sender_rx: mpsc::Receiver<TaskFinish>,
/// Receive messages from `DisputeSender` background tasks.
sender_rx: mpsc::Receiver<DisputeSenderMessage>,
/// Receiver for incoming requests.
req_receiver: Option<IncomingRequestReceiver<v1::DisputeRequest>>,
@@ -167,7 +168,7 @@ where
session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize)
.expect("Dispute window can not be 0; qed"),
});
let (tx, sender_rx) = mpsc::channel(1);
let (tx, sender_rx) = NestingSender::new_root(1);
let disputes_sender = DisputeSender::new(tx, metrics.clone());
Self {
runtime,
@@ -216,9 +217,16 @@ where
log_error(result, "on FromOrchestra")?;
},
MuxedMessage::Sender(result) => {
self.disputes_sender
.on_task_message(result.ok_or(FatalError::SenderExhausted)?)
.await;
let result = self
.disputes_sender
.on_message(
&mut ctx,
&mut self.runtime,
result.ok_or(FatalError::SenderExhausted)?,
)
.await
.map_err(Error::Sender);
log_error(result, "on_message")?;
},
}
}
@@ -260,14 +268,14 @@ enum MuxedMessage {
/// Messages from other subsystems.
Subsystem(FatalResult<FromOrchestra<DisputeDistributionMessage>>),
/// Messages from spawned sender background tasks.
Sender(Option<TaskFinish>),
Sender(Option<DisputeSenderMessage>),
}
#[overseer::contextbounds(DisputeDistribution, prefix = self::overseer)]
impl MuxedMessage {
async fn receive<Context>(
ctx: &mut Context,
from_sender: &mut mpsc::Receiver<TaskFinish>,
from_sender: &mut mpsc::Receiver<DisputeSenderMessage>,
) -> Self {
// We are only fusing here to make `select` happy, in reality we will quit if the stream
// ends.
@@ -21,19 +21,17 @@ use std::{
time::Duration,
};
use futures::{
channel::{mpsc, oneshot},
future::poll_fn,
Future,
};
use futures::{channel::oneshot, future::poll_fn, Future};
use futures_timer::Delay;
use indexmap::{map::Entry, IndexMap};
use polkadot_node_network_protocol::request_response::v1::DisputeRequest;
use polkadot_node_primitives::{CandidateVotes, DisputeMessage, SignedDisputeStatement};
use polkadot_node_subsystem::{messages::DisputeCoordinatorMessage, overseer, ActiveLeavesUpdate};
use polkadot_node_subsystem_util::runtime::RuntimeInfo;
use polkadot_primitives::v2::{CandidateHash, DisputeStatement, Hash, SessionIndex};
use polkadot_node_primitives::{DisputeMessage, DisputeStatus};
use polkadot_node_subsystem::{
messages::DisputeCoordinatorMessage, overseer, ActiveLeavesUpdate, SubsystemSender,
};
use polkadot_node_subsystem_util::{nesting_sender::NestingSender, runtime::RuntimeInfo};
use polkadot_primitives::v2::{CandidateHash, Hash, SessionIndex};
/// For each ongoing dispute we have a `SendTask` which takes care of it.
///
@@ -53,6 +51,15 @@ pub use error::{Error, FatalError, JfyiError, Result};
use self::error::JfyiErrorResult;
use crate::{Metrics, LOG_TARGET, SEND_RATE_LIMIT};
/// Messages as sent by background tasks.
#[derive(Debug)]
pub enum DisputeSenderMessage {
/// A task finished.
TaskFinish(TaskFinish),
/// A request for active disputes to the dispute-coordinator finished.
ActiveDisputesReady(JfyiErrorResult<Vec<(SessionIndex, CandidateHash, DisputeStatus)>>),
}
/// The `DisputeSender` keeps track of all ongoing disputes we need to send statements out.
///
/// For each dispute a `SendTask` is responsible for sending to the concerned validators for that
@@ -60,7 +67,7 @@ use crate::{Metrics, LOG_TARGET, SEND_RATE_LIMIT};
/// sessions/validator sets and cleans them up when they become obsolete.
///
/// The unit of work for the `DisputeSender` is a dispute, represented by `SendTask`s.
pub struct DisputeSender {
pub struct DisputeSender<M> {
/// All heads we currently consider active.
active_heads: Vec<Hash>,
@@ -72,10 +79,13 @@ pub struct DisputeSender {
/// All ongoing dispute sendings this subsystem is aware of.
///
/// Using an `IndexMap` so items can be iterated in the order of insertion.
disputes: IndexMap<CandidateHash, SendTask>,
disputes: IndexMap<CandidateHash, SendTask<M>>,
/// Sender to be cloned for `SendTask`s.
tx: mpsc::Sender<TaskFinish>,
tx: NestingSender<M, DisputeSenderMessage>,
/// `Some` if we are waiting for a response `DisputeCoordinatorMessage::ActiveDisputes`.
waiting_for_active_disputes: Option<WaitForActiveDisputesState>,
/// Future for delaying too frequent creation of dispute sending tasks.
rate_limit: RateLimit,
@@ -84,15 +94,25 @@ pub struct DisputeSender {
metrics: Metrics,
}
/// State we keep while waiting for active disputes.
///
/// When we send `DisputeCoordinatorMessage::ActiveDisputes`, this is the state we keep while
/// waiting for the response.
struct WaitForActiveDisputesState {
/// Have we seen any new sessions since last refresh?
have_new_sessions: bool,
}
#[overseer::contextbounds(DisputeDistribution, prefix = self::overseer)]
impl DisputeSender {
impl<M: 'static + Send + Sync> DisputeSender<M> {
/// Create a new `DisputeSender` which can be used to start dispute sendings.
pub fn new(tx: mpsc::Sender<TaskFinish>, metrics: Metrics) -> Self {
pub fn new(tx: NestingSender<M, DisputeSenderMessage>, metrics: Metrics) -> Self {
Self {
active_heads: Vec::new(),
active_sessions: HashMap::new(),
disputes: IndexMap::new(),
tx,
waiting_for_active_disputes: None,
rate_limit: RateLimit::new(),
metrics,
}
@@ -122,7 +142,7 @@ impl DisputeSender {
ctx,
runtime,
&self.active_sessions,
self.tx.clone(),
NestingSender::new(self.tx.clone(), DisputeSenderMessage::TaskFinish),
req,
&self.metrics,
)
@@ -133,14 +153,47 @@ impl DisputeSender {
Ok(())
}
/// Receive message from a background task.
pub async fn on_message<Context>(
&mut self,
ctx: &mut Context,
runtime: &mut RuntimeInfo,
msg: DisputeSenderMessage,
) -> Result<()> {
match msg {
DisputeSenderMessage::TaskFinish(msg) => {
let TaskFinish { candidate_hash, receiver, result } = msg;
self.metrics.on_sent_request(result.as_metrics_label());
let task = match self.disputes.get_mut(&candidate_hash) {
None => {
// Can happen when a dispute ends, with messages still in queue:
gum::trace!(
target: LOG_TARGET,
?result,
"Received `FromSendingTask::Finished` for non existing dispute."
);
return Ok(())
},
Some(task) => task,
};
task.on_finished_send(&receiver, result);
},
DisputeSenderMessage::ActiveDisputesReady(result) => {
let state = self.waiting_for_active_disputes.take();
let have_new_sessions = state.map(|s| s.have_new_sessions).unwrap_or(false);
let active_disputes = result?;
self.handle_new_active_disputes(ctx, runtime, active_disputes, have_new_sessions)
.await?;
},
}
Ok(())
}
/// Take care of a change in active leaves.
///
/// - Initiate a retry of failed sends which are still active.
/// - Get new authorities to send messages to.
/// - Get rid of obsolete tasks and disputes.
/// - Get dispute sending started in case we missed one for some reason (e.g. on node startup)
///
/// This function ensures the `SEND_RATE_LIMIT`, therefore it might block.
/// Update our knowledge on sessions and initiate fetching for new active disputes.
pub async fn update_leaves<Context>(
&mut self,
ctx: &mut Context,
@@ -154,14 +207,58 @@ impl DisputeSender {
let have_new_sessions = self.refresh_sessions(ctx, runtime).await?;
let active_disputes = get_active_disputes(ctx).await?;
let unknown_disputes = {
let mut disputes = active_disputes.clone();
disputes.retain(|(_, c)| !self.disputes.contains_key(c));
disputes
};
// Not yet waiting for data, request an update:
match self.waiting_for_active_disputes.take() {
None => {
self.waiting_for_active_disputes =
Some(WaitForActiveDisputesState { have_new_sessions });
let mut sender = ctx.sender().clone();
let mut tx = self.tx.clone();
let active_disputes: HashSet<_> = active_disputes.into_iter().map(|(_, c)| c).collect();
let get_active_disputes_task = async move {
let result = get_active_disputes(&mut sender).await;
let result =
tx.send_message(DisputeSenderMessage::ActiveDisputesReady(result)).await;
if let Err(err) = result {
gum::debug!(
target: LOG_TARGET,
?err,
"Sending `DisputeSenderMessage` from background task failed."
);
}
};
ctx.spawn("get_active_disputes", Box::pin(get_active_disputes_task))
.map_err(FatalError::SpawnTask)?;
},
Some(state) => {
let have_new_sessions = state.have_new_sessions || have_new_sessions;
let new_state = WaitForActiveDisputesState { have_new_sessions };
self.waiting_for_active_disputes = Some(new_state);
gum::debug!(
target: LOG_TARGET,
"Dispute coordinator slow? We are still waiting for data on next active leaves update."
);
},
}
Ok(())
}
/// Handle new active disputes response.
///
/// - Initiate a retry of failed sends which are still active.
/// - Get new authorities to send messages to.
/// - Get rid of obsolete tasks and disputes.
///
/// This function ensures the `SEND_RATE_LIMIT`, therefore it might block.
async fn handle_new_active_disputes<Context>(
&mut self,
ctx: &mut Context,
runtime: &mut RuntimeInfo,
active_disputes: Vec<(SessionIndex, CandidateHash, DisputeStatus)>,
have_new_sessions: bool,
) -> Result<()> {
let active_disputes: HashSet<_> = active_disputes.into_iter().map(|(_, c, _)| c).collect();
// Cleanup obsolete senders (retain keeps order of remaining elements):
self.disputes
@@ -188,165 +285,9 @@ impl DisputeSender {
should_rate_limit = sends_happened && have_new_sessions;
}
}
// This should only be non-empty on startup, but if not - we got you covered.
//
// Initial order will not be maintained in that case, but that should be fine as disputes
// recovered at startup will be relatively "old" anyway and we assume that no more than a
// third of the validators will go offline at any point in time anyway.
for dispute in unknown_disputes {
// Rate limiting handled inside `start_send_for_dispute` (calls `start_sender`).
self.start_send_for_dispute(ctx, runtime, dispute).await?;
}
Ok(())
}
/// Receive message from a sending task.
pub async fn on_task_message(&mut self, msg: TaskFinish) {
let TaskFinish { candidate_hash, receiver, result } = msg;
self.metrics.on_sent_request(result.as_metrics_label());
let task = match self.disputes.get_mut(&candidate_hash) {
None => {
// Can happen when a dispute ends, with messages still in queue:
gum::trace!(
target: LOG_TARGET,
?result,
"Received `FromSendingTask::Finished` for non existing dispute."
);
return
},
Some(task) => task,
};
task.on_finished_send(&receiver, result);
}
/// Call `start_sender` on all passed in disputes.
///
/// Recover necessary votes for building up `DisputeMessage` and start sending for all of them.
async fn start_send_for_dispute<Context>(
&mut self,
ctx: &mut Context,
runtime: &mut RuntimeInfo,
dispute: (SessionIndex, CandidateHash),
) -> Result<()> {
let (session_index, candidate_hash) = dispute;
// A relay chain head is required as context for receiving session info information from runtime and
// storage. We will iterate `active_sessions` to find a suitable head. We assume that there is at
// least one active head which, by `session_index`, is at least as recent as the `dispute` passed in.
// We need to avoid picking an older one from a session that might not yet exist in storage.
// Related to <https://github.com/paritytech/polkadot/issues/4730> .
let ref_head = self
.active_sessions
.iter()
.find_map(|(active_session_index, head_hash)| {
// There might be more than one session index that is at least as recent as the dispute
// so we just pick the first one. Keep in mind we are talking about the session index for the
// child of block identified by `head_hash` and not the session index for the block.
if active_session_index >= &session_index {
Some(head_hash)
} else {
None
}
})
.ok_or(JfyiError::NoActiveHeads)?;
let info = runtime
.get_session_info_by_index(ctx.sender(), *ref_head, session_index)
.await?;
let our_index = match info.validator_info.our_index {
None => {
gum::trace!(
target: LOG_TARGET,
"Not a validator in that session - not starting dispute sending."
);
return Ok(())
},
Some(index) => index,
};
let votes = match get_candidate_votes(ctx, session_index, candidate_hash).await? {
None => {
gum::debug!(
target: LOG_TARGET,
?session_index,
?candidate_hash,
"No votes for active dispute?! - possible, due to race."
);
return Ok(())
},
Some(votes) => votes,
};
let our_valid_vote = votes.valid.raw().get(&our_index);
let our_invalid_vote = votes.invalid.get(&our_index);
let (valid_vote, invalid_vote) = if let Some(our_valid_vote) = our_valid_vote {
// Get some invalid vote as well:
let invalid_vote =
votes.invalid.iter().next().ok_or(JfyiError::MissingVotesFromCoordinator)?;
((&our_index, our_valid_vote), invalid_vote)
} else if let Some(our_invalid_vote) = our_invalid_vote {
// Get some valid vote as well:
let valid_vote =
votes.valid.raw().iter().next().ok_or(JfyiError::MissingVotesFromCoordinator)?;
(valid_vote, (&our_index, our_invalid_vote))
} else {
// There is no vote from us yet - nothing to do.
return Ok(())
};
let (valid_index, (kind, signature)) = valid_vote;
let valid_public = info
.session_info
.validators
.get(*valid_index)
.ok_or(JfyiError::InvalidStatementFromCoordinator)?;
let valid_signed = SignedDisputeStatement::new_checked(
DisputeStatement::Valid(*kind),
candidate_hash,
session_index,
valid_public.clone(),
signature.clone(),
)
.map_err(|()| JfyiError::InvalidStatementFromCoordinator)?;
let (invalid_index, (kind, signature)) = invalid_vote;
let invalid_public = info
.session_info
.validators
.get(*invalid_index)
.ok_or(JfyiError::InvalidValidatorIndexFromCoordinator)?;
let invalid_signed = SignedDisputeStatement::new_checked(
DisputeStatement::Invalid(*kind),
candidate_hash,
session_index,
invalid_public.clone(),
signature.clone(),
)
.map_err(|()| JfyiError::InvalidValidatorIndexFromCoordinator)?;
// Reconstructing the checked signed dispute statements is hardly useful here and wasteful,
// but I don't want to enable a bypass for the below smart constructor and this code path
// is supposed to be only hit on startup basically.
//
// Revisit this decision when the `from_signed_statements` is unneeded for the normal code
// path as well.
let message = DisputeMessage::from_signed_statements(
valid_signed,
*valid_index,
invalid_signed,
*invalid_index,
votes.candidate_receipt,
&info.session_info,
)
.map_err(JfyiError::InvalidDisputeFromCoordinator)?;
// Finally, get the party started:
self.start_sender(ctx, runtime, message).await
}
/// Make active sessions correspond to currently active heads.
///
/// Returns: true if sessions changed.
@@ -431,33 +372,14 @@ async fn get_active_session_indices<Context>(
}
/// Retrieve Set of active disputes from the dispute coordinator.
#[overseer::contextbounds(DisputeDistribution, prefix = self::overseer)]
async fn get_active_disputes<Context>(
ctx: &mut Context,
) -> JfyiErrorResult<Vec<(SessionIndex, CandidateHash)>> {
async fn get_active_disputes<Sender>(
sender: &mut Sender,
) -> JfyiErrorResult<Vec<(SessionIndex, CandidateHash, DisputeStatus)>>
where
Sender: SubsystemSender<DisputeCoordinatorMessage>,
{
let (tx, rx) = oneshot::channel();
// Caller scope is in `update_leaves` and this is bounded by fork count.
ctx.send_unbounded_message(DisputeCoordinatorMessage::ActiveDisputes(tx));
rx.await
.map_err(|_| JfyiError::AskActiveDisputesCanceled)
.map(|disputes| disputes.into_iter().map(|d| (d.0, d.1)).collect())
}
/// Get all locally available dispute votes for a given dispute.
#[overseer::contextbounds(DisputeDistribution, prefix = self::overseer)]
async fn get_candidate_votes<Context>(
ctx: &mut Context,
session_index: SessionIndex,
candidate_hash: CandidateHash,
) -> JfyiErrorResult<Option<CandidateVotes>> {
let (tx, rx) = oneshot::channel();
// Caller scope is in `update_leaves` and this is bounded by fork count.
ctx.send_unbounded_message(DisputeCoordinatorMessage::QueryCandidateVotes(
vec![(session_index, candidate_hash)],
tx,
));
rx.await
.map(|v| v.get(0).map(|inner| inner.to_owned().2))
.map_err(|_| JfyiError::AskCandidateVotesCanceled)
sender.send_message(DisputeCoordinatorMessage::ActiveDisputes(tx)).await;
rx.await.map_err(|_| JfyiError::AskActiveDisputesCanceled)
}
@@ -16,7 +16,7 @@
use std::collections::{HashMap, HashSet};
use futures::{channel::mpsc, future::RemoteHandle, Future, FutureExt, SinkExt};
use futures::{future::RemoteHandle, Future, FutureExt};
use polkadot_node_network_protocol::{
request_response::{
@@ -27,7 +27,7 @@ use polkadot_node_network_protocol::{
IfDisconnected,
};
use polkadot_node_subsystem::{messages::NetworkBridgeTxMessage, overseer};
use polkadot_node_subsystem_util::{metrics, runtime::RuntimeInfo};
use polkadot_node_subsystem_util::{metrics, nesting_sender::NestingSender, runtime::RuntimeInfo};
use polkadot_primitives::v2::{
AuthorityDiscoveryId, CandidateHash, Hash, SessionIndex, ValidatorIndex,
};
@@ -44,7 +44,7 @@ use crate::{
/// Keeps track of all the validators that have to be reached for a dispute.
///
/// The unit of work for a `SendTask` is an authority/validator.
pub struct SendTask {
pub struct SendTask<M> {
/// The request we are supposed to get out to all `parachain` validators of the dispute's session
/// and to all current authorities.
request: DisputeRequest,
@@ -58,7 +58,7 @@ pub struct SendTask {
has_failed_sends: bool,
/// Sender to be cloned for tasks.
tx: mpsc::Sender<TaskFinish>,
tx: NestingSender<M, TaskFinish>,
}
/// Status of a particular vote/statement delivery to a particular validator.
@@ -100,7 +100,7 @@ impl TaskResult {
}
#[overseer::contextbounds(DisputeDistribution, prefix = self::overseer)]
impl SendTask {
impl<M: 'static + Send + Sync> SendTask<M> {
/// Initiates sending a dispute message to peers.
///
/// Creation of new `SendTask`s is subject to rate limiting. As each `SendTask` will trigger
@@ -110,7 +110,7 @@ impl SendTask {
ctx: &mut Context,
runtime: &mut RuntimeInfo,
active_sessions: &HashMap<SessionIndex, Hash>,
tx: mpsc::Sender<TaskFinish>,
tx: NestingSender<M, TaskFinish>,
request: DisputeRequest,
metrics: &Metrics,
) -> Result<Self> {
@@ -272,9 +272,9 @@ impl SendTask {
///
/// And spawn tasks for handling the response.
#[overseer::contextbounds(DisputeDistribution, prefix = self::overseer)]
async fn send_requests<Context>(
async fn send_requests<Context, M: 'static + Send + Sync>(
ctx: &mut Context,
tx: mpsc::Sender<TaskFinish>,
tx: NestingSender<M, TaskFinish>,
receivers: Vec<AuthorityDiscoveryId>,
req: DisputeRequest,
metrics: &Metrics,
@@ -307,11 +307,11 @@ async fn send_requests<Context>(
}
/// Future to be spawned in a task for awaiting a response.
async fn wait_response_task(
async fn wait_response_task<M: 'static + Send + Sync>(
pending_response: impl Future<Output = OutgoingResult<DisputeResponse>>,
candidate_hash: CandidateHash,
receiver: AuthorityDiscoveryId,
mut tx: mpsc::Sender<TaskFinish>,
mut tx: NestingSender<M, TaskFinish>,
_timer: Option<metrics::prometheus::prometheus::HistogramTimer>,
) {
let result = pending_response.await;
@@ -320,7 +320,7 @@ async fn wait_response_task(
Ok(DisputeResponse::Confirmed) =>
TaskFinish { candidate_hash, receiver, result: TaskResult::Succeeded },
};
if let Err(err) = tx.feed(msg).await {
if let Err(err) = tx.send_message(msg).await {
gum::debug!(
target: LOG_TARGET,
%err,
@@ -45,7 +45,7 @@ use polkadot_node_network_protocol::{
request_response::{v1::DisputeResponse, Recipient, Requests},
IfDisconnected,
};
use polkadot_node_primitives::{CandidateVotes, DisputeStatus, UncheckedDisputeMessage};
use polkadot_node_primitives::DisputeStatus;
use polkadot_node_subsystem::{
messages::{
AllMessages, DisputeCoordinatorMessage, DisputeDistributionMessage, ImportStatementsResult,
@@ -479,65 +479,6 @@ fn receive_rate_limit_is_enforced() {
test_harness(test);
}
#[test]
fn disputes_are_recovered_at_startup() {
let test = |mut handle: TestSubsystemContextHandle<DisputeDistributionMessage>, _| async move {
let relay_parent = Hash::random();
let candidate = make_candidate_receipt(relay_parent);
let _ = handle_subsystem_startup(&mut handle, Some(candidate.hash())).await;
let message = make_dispute_message(candidate.clone(), ALICE_INDEX, FERDIE_INDEX).await;
// Requests needed session info:
assert_matches!(
handle.recv().await,
AllMessages::DisputeCoordinator(
DisputeCoordinatorMessage::QueryCandidateVotes(
query,
tx,
)
) => {
let (session_index, candidate_hash) = query.get(0).unwrap().clone();
assert_eq!(session_index, MOCK_SESSION_INDEX);
assert_eq!(candidate_hash, candidate.hash());
let unchecked: UncheckedDisputeMessage = message.into();
tx.send(vec![(session_index, candidate_hash, CandidateVotes {
candidate_receipt: candidate,
valid: [(
unchecked.valid_vote.validator_index,
(unchecked.valid_vote.kind,
unchecked.valid_vote.signature
),
)].into_iter().collect(),
invalid: [(
unchecked.invalid_vote.validator_index,
(
unchecked.invalid_vote.kind,
unchecked.invalid_vote.signature
),
)].into_iter().collect(),
})])
.expect("Receiver should stay alive.");
}
);
let expected_receivers = {
let info = &MOCK_SESSION_INFO;
info.discovery_keys
.clone()
.into_iter()
.filter(|a| a != &Sr25519Keyring::Ferdie.public().into())
.collect()
// All validators are also authorities in the first session, so we are
// done here.
};
check_sent_requests(&mut handle, expected_receivers, true).await;
conclude(&mut handle).await;
};
test_harness(test);
}
#[test]
fn send_dispute_gets_cleaned_up() {
let test = |mut handle: TestSubsystemContextHandle<DisputeDistributionMessage>, _| async move {
@@ -605,6 +546,7 @@ fn send_dispute_gets_cleaned_up() {
#[test]
fn dispute_retries_and_works_across_session_boundaries() {
sp_tracing::try_init_simple();
let test = |mut handle: TestSubsystemContextHandle<DisputeDistributionMessage>, _| async move {
let old_head = handle_subsystem_startup(&mut handle, None).await;
@@ -28,7 +28,7 @@ use polkadot_primitives::v2::{
CandidateReceipt, DisputeStatement, SessionIndex, SessionInfo, ValidatorIndex,
};
/// A dispute initiating/participating message that is guaranteed to have been built from signed
/// A dispute initiating/participating message that have been built from signed
/// statements.
///
/// And most likely has been constructed correctly. This is used with
+1
View File
@@ -8,6 +8,7 @@ edition.workspace = true
[dependencies]
async-trait = "0.1.57"
futures = "0.3.21"
futures-channel = "0.3.23"
itertools = "0.10"
parity-scale-codec = { version = "3.1.5", default-features = false, features = ["derive"] }
parking_lot = "0.11.2"
+6
View File
@@ -72,6 +72,12 @@ pub mod runtime;
/// Database trait for subsystem.
pub mod database;
/// Nested message sending
///
/// Useful for having mostly synchronous code, with submodules spawning short lived asynchronous
/// tasks, sending messages back.
pub mod nesting_sender;
mod determine_new_blocks;
#[cfg(test)]
@@ -0,0 +1,207 @@
// Copyright 2022-2023 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! ## Background
//!
//! Writing concurrent and even multithreaded by default is inconvenient and slow: No references
//! hence lots of needless cloning and data duplication, locks, mutexes, ... We should reach
//! for concurrency and parallelism when there is an actual need, not just because we can and it is
//! reasonably safe in Rust.
//!
//! I very much agree with many points in this blog post for example:
//!
//! <https://maciej.codes/2022-06-09-local-async.html>
//!
//! Another very good post by Pierre (Tomaka):
//!
//! <https://tomaka.medium.com/a-look-back-at-asynchronous-rust-d54d63934a1c>
//!
//! ## Architecture
//!
//! This module helps with this in part. It does not break the multithreaded by default approach,
//! but it breaks the `spawn everything` approach. So once you `spawn` you will still be
//! multithreaded by default, despite that for most tasks we spawn (which just wait for network or some
//! message to arrive), that is very much pointless and needless overhead. You will just spawn less in
//! the first place.
//!
//! By default your code is single threaded, except when actually needed:
//! - need to wait for long running synchronous IO (a threaded runtime is actually useful here)
//! - need to wait for some async event (message to arrive)
//! - need to do some hefty CPU bound processing (a thread is required here as well)
//!
//! and it is not acceptable to block the main task for waiting for the result, because we actually
//! really have other things to do or at least need to stay responsive just in case.
//!
//! With the types and traits in this module you can achieve exactly that: You write modules which
//! just execute logic and can call into the functions of other modules - yes we are calling normal
//! functions. For the case a module you are calling into requires an occasional background task,
//! you provide it with a `NestingSender<M, ChildModuleMessage>` that it can pass to any spawned tasks.
//!
//! This way you don't have to spawn a task for each module just for it to be able to handle
//! asynchronous events. The module relies on the using/enclosing code/module to forward it any
//! asynchronous messages in a structured way.
//!
//! What makes this architecture nice is the separation of concerns - at the top you only have to
//! provide a sender and dispatch received messages to the root module - it is completely
//! irrelevant how complex that module is, it might consist of child modules also having the need
//! to spawn and receive messages, which in turn do the same, still the root logic stays unchanged.
//! Everything is isolated to the level where it belongs, while we still keep a single task scope
//! in all non blocking/not CPU intensive parts, which allows us to share data via references for
//! example.
//!
//! Because the wrapping is optional and transparent to the lower modules, each module can also be
//! used at the top directly without any wrapping, e.g. for standalone use or for testing purposes.
//!
//! Checkout the documentation of [`NestingSender`][nesting_sender::NestingSender] below for a basic usage example. For a real
//! world usage I would like to point you to the dispute-distribution subsystem which makes use of
//! this architecture.
//!
//! ## Limitations
//!
//! Nothing is ever for free of course: Each level adds an indirect function call to message
//! sending. which should be cheap enough for most applications, but something to keep in mind. In
//! particular we avoided the use of of async traits, which would have required memory allocations
//! on each send. Also cloning of [`NestingSender`][nesting_sender::NestingSender] is more
//! expensive than cloning a plain mpsc::Sender, the overhead should be negligible though.
//!
//! Further limitations: Because everything is routed to the same channel, it is not possible with
//! this approach to put back pressure on only a single source (as all are the same). If a module
//! has a task that requires this, it indeed has to spawn a long running task which can do the
//! back-pressure on that message source or we make it its own subsystem. This is just one of the
//! situations that justifies the complexity of asynchronism.
use std::{convert::identity, sync::Arc};
use futures::{channel::mpsc, SinkExt};
/// A message sender that supports sending nested messages.
///
/// This sender wraps an `mpsc::Sender` and a conversion function for converting given messages of
/// type `Mnested` to the message type actually supported by the mpsc (`M`).
///
/// Example:
///
/// ```rust
/// # use polkadot_node_subsystem_util::nesting_sender::NestingSender;
///
/// enum RootMessage {
/// Child1Message(ChildMessage),
/// Child2Message(OtherChildMessage),
/// SomeOwnMessage,
/// }
///
/// enum ChildMessage {
/// TaskFinished(u32),
/// }
///
/// enum OtherChildMessage {
/// QueryResult(bool),
/// }
///
/// // We would then pass in a `NestingSender` to our child module of the following type:
/// type ChildSender = NestingSender<RootMessage, ChildMessage>;
///
/// // Types in the child module can (and should) be generic over the root type:
/// struct ChildState<M> {
/// tx: NestingSender<M, ChildMessage>,
/// }
///
///
/// // Create the root message sender:
///
/// let (root_sender, receiver) = NestingSender::new_root(1);
/// // Get a sender for the child module based on that root sender:
/// let child_sender = NestingSender::new(root_sender.clone(), RootMessage::Child1Message);
/// // pass `child_sender` to child module ...
/// ```
///
/// `ChildMessage` could itself have a constructor with messages of a child of its own and can use
/// `NestingSender::new` with its own sender and a conversion function to provide a further nested
/// sender, suitable for the child module.
pub struct NestingSender<M, Mnested> {
sender: mpsc::Sender<M>,
conversion: Arc<dyn Fn(Mnested) -> M + 'static + Send + Sync>,
}
impl<M> NestingSender<M, M>
where
M: 'static,
{
/// Create a new "root" sender.
///
/// This is a sender that directly passes messages to the internal mpsc.
///
/// Params: The channel size of the created mpsc.
/// Returns: The newly constructed `NestingSender` and the corresponding mpsc receiver.
pub fn new_root(channel_size: usize) -> (Self, mpsc::Receiver<M>) {
let (sender, receiver) = mpsc::channel(channel_size);
let s = Self { sender, conversion: Arc::new(identity) };
(s, receiver)
}
}
impl<M, Mnested> NestingSender<M, Mnested>
where
M: 'static,
Mnested: 'static,
{
/// Create a new `NestingSender` which wraps a given "parent" sender.
///
/// By passing in a necessary conversion from `Mnested` to `Mparent` (the `Mnested` of the
/// parent sender), we can construct a derived `NestingSender<M, Mnested>` from a
/// `NestingSender<M, Mparent>`.
///
/// Resulting sender does the following conversion:
///
/// ```text
/// Mnested -> Mparent -> M
/// Inputs:
/// F(Mparent) -> M (via parent)
/// F(Mnested) -> Mparent (via child_conversion)
/// Result: F(Mnested) -> M
/// ```
pub fn new<Mparent>(
parent: NestingSender<M, Mparent>,
child_conversion: fn(Mnested) -> Mparent,
) -> Self
where
Mparent: 'static,
{
let NestingSender { sender, conversion } = parent;
Self { sender, conversion: Arc::new(move |x| conversion(child_conversion(x))) }
}
/// Send a message via the underlying mpsc.
///
/// Necessary conversion is accomplished.
pub async fn send_message(&mut self, m: Mnested) -> Result<(), mpsc::SendError> {
// Flushing on an mpsc means to wait for the receiver to pick up the data - we don't want
// to wait for that.
self.sender.feed((self.conversion)(m)).await
}
}
// Helper traits and implementations:
impl<M, M1> Clone for NestingSender<M, M1>
where
M: 'static,
M1: 'static,
{
fn clone(&self) -> Self {
Self { sender: self.sender.clone(), conversion: self.conversion.clone() }
}
}