mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-12 20:31:13 +00:00
Reduce dispute coordinator load (#5785)
* Don't import backing statements directly into the dispute coordinator. This also gets rid of a redundant signature check. Both should have some impact on backing performance. In general this PR should make us scale better in the number of parachains. Reasoning (aka why this is fine): For the signature check: As mentioned, it is a redundant check. The signature has already been checked at this point. This is even made obvious by the used types. The smart constructor is not perfect as discussed [here](https://github.com/paritytech/polkadot/issues/3455), but is still a reasonable security. For not importing to the dispute-coordinator: This should be good as the dispute coordinator does scrape backing votes from chain. This suffices in practice as a super majority of validators must have seen a backing fork in order for a candidate to get included and only included candidates pose a threat to our system. The import from chain is preferable over direct import of backing votes for two reasons: 1. The import is batched, greatly improving import performance. All backing votes for a candidate are imported with a single import. And indeed we were able to see in metrics that importing votes from chain is fast. 2. We do less work in general as not every candidate for which statements are gossiped might actually make it on a chain. The dispute coordinator as with the current implementation would still import and keep those votes around for six sessions. While redundancy is good for reliability in the event of bugs, this also comes at a non negligible cost. The dispute-coordinator right now is the subsystem with the highest load, despite the fact that it should not be doing much during mormal operation and it is only getting worse with more parachains as the load is a direct function of the number of statements. We'll see on Versi how much of a performance improvement this PR * Get rid of dead code. * Dont send approval vote * Make it pass CI * Bring back tests for fixing them later. * Explicit signature check. * Resurrect approval-voting tests (not fixed yet) * Send out approval votes in dispute-distribution. Use BTreeMap for ordered dispute votes. * Bring back an important warning. * Fix approval voting tests. * Don't send out dispute message on import + test + Some cleanup. * Guide changes. Note that the introduced complexity is actually redundant. * WIP: guide changes. * Finish guide changes about dispute-coordinator conceputally. Requires more proof read still. Also removed obsolete implementation details, where the code is better suited as the source of truth. * Finish guide changes for now. * Remove own approval vote import logic. * Implement logic for retrieving approval-votes into approval-voting and approval-distribution subsystems. * Update roadmap/implementers-guide/src/node/disputes/dispute-coordinator.md Co-authored-by: asynchronous rob <rphmeier@gmail.com> * Review feedback. In particular: Add note about disputes of non included candidates. * Incorporate Review Remarks * Get rid of superfluous space. * Tidy up import logic a bit. Logical vote import is now separated, making the code more readable and maintainable. Also: Accept import if there is at least one invalid signer that has not exceeded its spam slots, instead of requiring all of them to not exceed their limits. This is more correct and a preparation for vote batching. * We don't need/have empty imports. * Fix tests and bugs. * Remove error prone redundancy. * Import approval votes on dispute initiated/concluded. * Add test for approval vote import. * Make guide checker happy (hopefully) * Another sanity check + better logs. * Reasoning about boundedness. * Use `CandidateIndex` as opposed to `CoreIndex`. * Remove redundant import. * Review remarks. * Add metric for calls to request signatures * More review remarks. * Add metric on imported approval votes. * Include candidate hash in logs. * More trace log * Break cycle. * Add some tracing. * Cleanup allowed messages. * fmt * Tracing + timeout for get inherent data. * Better error. * Break cycle in all places. * Clarified comment some more. * Typo. * Break cycle approval-distribution - approval-voting. Co-authored-by: asynchronous rob <rphmeier@gmail.com>
This commit is contained in:
@@ -26,7 +26,7 @@ use polkadot_node_primitives::{
|
||||
approval::{
|
||||
BlockApprovalMeta, DelayTranche, IndirectAssignmentCert, IndirectSignedApprovalVote,
|
||||
},
|
||||
SignedDisputeStatement, ValidationResult, APPROVAL_EXECUTION_TIMEOUT,
|
||||
ValidationResult, APPROVAL_EXECUTION_TIMEOUT,
|
||||
};
|
||||
use polkadot_node_subsystem::{
|
||||
errors::RecoveryError,
|
||||
@@ -99,6 +99,11 @@ mod tests;
|
||||
pub const APPROVAL_SESSIONS: SessionWindowSize = new_session_window_size!(6);
|
||||
|
||||
const APPROVAL_CHECKING_TIMEOUT: Duration = Duration::from_secs(120);
|
||||
/// How long are we willing to wait for approval signatures?
|
||||
///
|
||||
/// Value rather arbitrarily: Should not be hit in practice, it exists to more easily diagnose dead
|
||||
/// lock issues for example.
|
||||
const WAIT_FOR_SIGS_TIMEOUT: Duration = Duration::from_millis(500);
|
||||
const APPROVAL_CACHE_SIZE: usize = 1024;
|
||||
const TICK_TOO_FAR_IN_FUTURE: Tick = 20; // 10 seconds.
|
||||
const APPROVAL_DELAY: Tick = 2;
|
||||
@@ -152,6 +157,7 @@ struct MetricsInner {
|
||||
block_approval_time_ticks: prometheus::Histogram,
|
||||
time_db_transaction: prometheus::Histogram,
|
||||
time_recover_and_approve: prometheus::Histogram,
|
||||
candidate_signatures_requests_total: prometheus::Counter<prometheus::U64>,
|
||||
}
|
||||
|
||||
/// Approval Voting metrics.
|
||||
@@ -225,6 +231,12 @@ impl Metrics {
|
||||
}
|
||||
}
|
||||
|
||||
fn on_candidate_signatures_request(&self) {
|
||||
if let Some(metrics) = &self.0 {
|
||||
metrics.candidate_signatures_requests_total.inc();
|
||||
}
|
||||
}
|
||||
|
||||
fn time_db_transaction(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
|
||||
self.0.as_ref().map(|metrics| metrics.time_db_transaction.start_timer())
|
||||
}
|
||||
@@ -315,6 +327,13 @@ impl metrics::Metrics for Metrics {
|
||||
)?,
|
||||
registry,
|
||||
)?,
|
||||
candidate_signatures_requests_total: prometheus::register(
|
||||
prometheus::Counter::new(
|
||||
"polkadot_parachain_approval_candidate_signatures_requests_total",
|
||||
"Number of times signatures got requested by other subsystems",
|
||||
)?,
|
||||
registry,
|
||||
)?,
|
||||
};
|
||||
|
||||
Ok(Metrics(Some(metrics)))
|
||||
@@ -691,13 +710,6 @@ enum Action {
|
||||
candidate: CandidateReceipt,
|
||||
backing_group: GroupIndex,
|
||||
},
|
||||
InformDisputeCoordinator {
|
||||
candidate_hash: CandidateHash,
|
||||
candidate_receipt: CandidateReceipt,
|
||||
session: SessionIndex,
|
||||
dispute_statement: SignedDisputeStatement,
|
||||
validator_index: ValidatorIndex,
|
||||
},
|
||||
NoteApprovedInChainSelection(Hash),
|
||||
IssueApproval(CandidateHash, ApprovalVoteRequest),
|
||||
BecomeActive,
|
||||
@@ -957,22 +969,6 @@ async fn handle_actions<Context>(
|
||||
Some(_) => {},
|
||||
}
|
||||
},
|
||||
Action::InformDisputeCoordinator {
|
||||
candidate_hash,
|
||||
candidate_receipt,
|
||||
session,
|
||||
dispute_statement,
|
||||
validator_index,
|
||||
} => {
|
||||
ctx.send_message(DisputeCoordinatorMessage::ImportStatements {
|
||||
candidate_hash,
|
||||
candidate_receipt,
|
||||
session,
|
||||
statements: vec![(dispute_statement, validator_index)],
|
||||
pending_confirmation: None,
|
||||
})
|
||||
.await;
|
||||
},
|
||||
Action::NoteApprovedInChainSelection(block_hash) => {
|
||||
ctx.send_message(ChainSelectionMessage::Approved(block_hash)).await;
|
||||
},
|
||||
@@ -1192,12 +1188,94 @@ async fn handle_from_overseer<Context>(
|
||||
|
||||
Vec::new()
|
||||
},
|
||||
ApprovalVotingMessage::GetApprovalSignaturesForCandidate(candidate_hash, tx) => {
|
||||
metrics.on_candidate_signatures_request();
|
||||
get_approval_signatures_for_candidate(ctx, db, candidate_hash, tx).await?;
|
||||
Vec::new()
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
Ok(actions)
|
||||
}
|
||||
|
||||
/// Retrieve approval signatures.
|
||||
///
|
||||
/// This involves an unbounded message send to approval-distribution, the caller has to ensure that
|
||||
/// calls to this function are infrequent and bounded.
|
||||
#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
|
||||
async fn get_approval_signatures_for_candidate<Context>(
|
||||
ctx: &mut Context,
|
||||
db: &OverlayedBackend<'_, impl Backend>,
|
||||
candidate_hash: CandidateHash,
|
||||
tx: oneshot::Sender<HashMap<ValidatorIndex, ValidatorSignature>>,
|
||||
) -> SubsystemResult<()> {
|
||||
let entry = match db.load_candidate_entry(&candidate_hash)? {
|
||||
None => return Ok(()),
|
||||
Some(e) => e,
|
||||
};
|
||||
|
||||
let relay_hashes = entry.block_assignments.iter().map(|(relay_hash, _)| relay_hash);
|
||||
|
||||
let mut candidate_indices = HashSet::new();
|
||||
// Retrieve `CoreIndices`/`CandidateIndices` as required by approval-distribution:
|
||||
for hash in relay_hashes {
|
||||
let entry = match db.load_block_entry(hash)? {
|
||||
None => {
|
||||
gum::debug!(
|
||||
target: LOG_TARGET,
|
||||
?candidate_hash,
|
||||
?hash,
|
||||
"Block entry for assignment missing."
|
||||
);
|
||||
continue
|
||||
},
|
||||
Some(e) => e,
|
||||
};
|
||||
for (candidate_index, (_core_index, c_hash)) in entry.candidates().iter().enumerate() {
|
||||
if c_hash == &candidate_hash {
|
||||
candidate_indices.insert((*hash, candidate_index as u32));
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut sender = ctx.sender().clone();
|
||||
let get_approvals = async move {
|
||||
let (tx_distribution, rx_distribution) = oneshot::channel();
|
||||
sender.send_unbounded_message(ApprovalDistributionMessage::GetApprovalSignatures(
|
||||
candidate_indices,
|
||||
tx_distribution,
|
||||
));
|
||||
|
||||
// Because of the unbounded sending and the nature of the call (just fetching data from state),
|
||||
// this should not block long:
|
||||
match rx_distribution.timeout(WAIT_FOR_SIGS_TIMEOUT).await {
|
||||
None => {
|
||||
gum::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Waiting for approval signatures timed out - dead lock?"
|
||||
);
|
||||
},
|
||||
Some(Err(_)) => gum::debug!(
|
||||
target: LOG_TARGET,
|
||||
"Request for approval signatures got cancelled by `approval-distribution`."
|
||||
),
|
||||
Some(Ok(votes)) =>
|
||||
if let Err(_) = tx.send(votes) {
|
||||
gum::debug!(
|
||||
target: LOG_TARGET,
|
||||
"Sending approval signatures back failed, as receiver got closed"
|
||||
);
|
||||
},
|
||||
}
|
||||
};
|
||||
|
||||
// No need to block subsystem on this (also required to break cycle).
|
||||
// We should not be sending this message frequently - caller must make sure this is bounded.
|
||||
ctx.spawn("get-approval-signatures", Box::pin(get_approvals))
|
||||
}
|
||||
|
||||
#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
|
||||
async fn handle_approved_ancestor<Context>(
|
||||
ctx: &mut Context,
|
||||
@@ -1717,19 +1795,17 @@ fn check_and_import_approval<T>(
|
||||
)),
|
||||
};
|
||||
|
||||
// Transform the approval vote into the wrapper used to import statements into disputes.
|
||||
// This also does signature checking.
|
||||
let signed_dispute_statement = match SignedDisputeStatement::new_checked(
|
||||
DisputeStatement::Valid(ValidDisputeStatementKind::ApprovalChecking),
|
||||
// Signature check:
|
||||
match DisputeStatement::Valid(ValidDisputeStatementKind::ApprovalChecking).check_signature(
|
||||
&pubkey,
|
||||
approved_candidate_hash,
|
||||
block_entry.session(),
|
||||
pubkey.clone(),
|
||||
approval.signature.clone(),
|
||||
&approval.signature,
|
||||
) {
|
||||
Err(_) => respond_early!(ApprovalCheckResult::Bad(ApprovalCheckError::InvalidSignature(
|
||||
approval.validator
|
||||
),)),
|
||||
Ok(s) => s,
|
||||
Ok(()) => {},
|
||||
};
|
||||
|
||||
let candidate_entry = match db.load_candidate_entry(&approved_candidate_hash)? {
|
||||
@@ -1770,23 +1846,7 @@ fn check_and_import_approval<T>(
|
||||
"Importing approval vote",
|
||||
);
|
||||
|
||||
let inform_disputes_action = if !candidate_entry.has_approved(approval.validator) {
|
||||
// The approval voting system requires a separate approval for each assignment
|
||||
// to the candidate. It's possible that there are semi-duplicate approvals,
|
||||
// but we only need to inform the dispute coordinator about the first expressed
|
||||
// opinion by the validator about the candidate.
|
||||
Some(Action::InformDisputeCoordinator {
|
||||
candidate_hash: approved_candidate_hash,
|
||||
candidate_receipt: candidate_entry.candidate_receipt().clone(),
|
||||
session: block_entry.session(),
|
||||
dispute_statement: signed_dispute_statement,
|
||||
validator_index: approval.validator,
|
||||
})
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let mut actions = advance_approval_state(
|
||||
let actions = advance_approval_state(
|
||||
state,
|
||||
db,
|
||||
&metrics,
|
||||
@@ -1796,8 +1856,6 @@ fn check_and_import_approval<T>(
|
||||
ApprovalStateTransition::RemoteApproval(approval.validator),
|
||||
);
|
||||
|
||||
actions.extend(inform_disputes_action);
|
||||
|
||||
Ok((actions, t))
|
||||
}
|
||||
|
||||
@@ -2242,15 +2300,12 @@ async fn launch_approval<Context>(
|
||||
"Data recovery invalid for candidate {:?}",
|
||||
(candidate_hash, candidate.descriptor.para_id),
|
||||
);
|
||||
|
||||
sender
|
||||
.send_message(DisputeCoordinatorMessage::IssueLocalStatement(
|
||||
session_index,
|
||||
candidate_hash,
|
||||
candidate.clone(),
|
||||
false,
|
||||
))
|
||||
.await;
|
||||
issue_local_invalid_statement(
|
||||
&mut sender,
|
||||
session_index,
|
||||
candidate_hash,
|
||||
candidate.clone(),
|
||||
);
|
||||
metrics_guard.take().on_approval_invalid();
|
||||
},
|
||||
}
|
||||
@@ -2304,14 +2359,12 @@ async fn launch_approval<Context>(
|
||||
return ApprovalState::approved(validator_index, candidate_hash)
|
||||
} else {
|
||||
// Commitments mismatch - issue a dispute.
|
||||
sender
|
||||
.send_message(DisputeCoordinatorMessage::IssueLocalStatement(
|
||||
session_index,
|
||||
candidate_hash,
|
||||
candidate.clone(),
|
||||
false,
|
||||
))
|
||||
.await;
|
||||
issue_local_invalid_statement(
|
||||
&mut sender,
|
||||
session_index,
|
||||
candidate_hash,
|
||||
candidate.clone(),
|
||||
);
|
||||
|
||||
metrics_guard.take().on_approval_invalid();
|
||||
return ApprovalState::failed(validator_index, candidate_hash)
|
||||
@@ -2326,14 +2379,12 @@ async fn launch_approval<Context>(
|
||||
"Detected invalid candidate as an approval checker.",
|
||||
);
|
||||
|
||||
sender
|
||||
.send_message(DisputeCoordinatorMessage::IssueLocalStatement(
|
||||
session_index,
|
||||
candidate_hash,
|
||||
candidate.clone(),
|
||||
false,
|
||||
))
|
||||
.await;
|
||||
issue_local_invalid_statement(
|
||||
&mut sender,
|
||||
session_index,
|
||||
candidate_hash,
|
||||
candidate.clone(),
|
||||
);
|
||||
|
||||
metrics_guard.take().on_approval_invalid();
|
||||
return ApprovalState::failed(validator_index, candidate_hash)
|
||||
@@ -2468,17 +2519,6 @@ async fn issue_approval<Context>(
|
||||
},
|
||||
};
|
||||
|
||||
// Record our statement in the dispute coordinator for later
|
||||
// participation in disputes on the same candidate.
|
||||
let signed_dispute_statement = SignedDisputeStatement::new_checked(
|
||||
DisputeStatement::Valid(ValidDisputeStatementKind::ApprovalChecking),
|
||||
candidate_hash,
|
||||
session,
|
||||
validator_pubkey.clone(),
|
||||
sig.clone(),
|
||||
)
|
||||
.expect("Statement just signed; should pass checks; qed");
|
||||
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
?candidate_hash,
|
||||
@@ -2487,25 +2527,7 @@ async fn issue_approval<Context>(
|
||||
"Issuing approval vote",
|
||||
);
|
||||
|
||||
let candidate_receipt = candidate_entry.candidate_receipt().clone();
|
||||
|
||||
let inform_disputes_action = if candidate_entry.has_approved(validator_index) {
|
||||
// The approval voting system requires a separate approval for each assignment
|
||||
// to the candidate. It's possible that there are semi-duplicate approvals,
|
||||
// but we only need to inform the dispute coordinator about the first expressed
|
||||
// opinion by the validator about the candidate.
|
||||
Some(Action::InformDisputeCoordinator {
|
||||
candidate_hash,
|
||||
candidate_receipt,
|
||||
session,
|
||||
dispute_statement: signed_dispute_statement,
|
||||
validator_index,
|
||||
})
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let mut actions = advance_approval_state(
|
||||
let actions = advance_approval_state(
|
||||
state,
|
||||
db,
|
||||
metrics,
|
||||
@@ -2527,9 +2549,6 @@ async fn issue_approval<Context>(
|
||||
},
|
||||
));
|
||||
|
||||
// dispatch to dispute coordinator.
|
||||
actions.extend(inform_disputes_action);
|
||||
|
||||
Ok(actions)
|
||||
}
|
||||
|
||||
@@ -2546,3 +2565,29 @@ fn sign_approval(
|
||||
|
||||
Some(key.sign(&payload[..]))
|
||||
}
|
||||
|
||||
/// Send `IssueLocalStatement` to dispute-coordinator.
|
||||
fn issue_local_invalid_statement<Sender>(
|
||||
sender: &mut Sender,
|
||||
session_index: SessionIndex,
|
||||
candidate_hash: CandidateHash,
|
||||
candidate: CandidateReceipt,
|
||||
) where
|
||||
Sender: overseer::ApprovalVotingSenderTrait,
|
||||
{
|
||||
// We need to send an unbounded message here to break a cycle:
|
||||
// DisputeCoordinatorMessage::IssueLocalStatement ->
|
||||
// ApprovalVotingMessage::GetApprovalSignaturesForCandidate.
|
||||
//
|
||||
// Use of unbounded _should_ be fine here as raising a dispute should be an
|
||||
// exceptional event. Even in case of bugs: There can be no more than
|
||||
// number of slots per block requests every block. Also for sending this
|
||||
// message a full recovery and validation procedure took place, which takes
|
||||
// longer than issuing a local statement + import.
|
||||
sender.send_unbounded_message(DisputeCoordinatorMessage::IssueLocalStatement(
|
||||
session_index,
|
||||
candidate_hash,
|
||||
candidate.clone(),
|
||||
false,
|
||||
));
|
||||
}
|
||||
|
||||
@@ -577,7 +577,6 @@ async fn check_and_import_approval(
|
||||
candidate_hash: CandidateHash,
|
||||
session_index: SessionIndex,
|
||||
expect_chain_approved: bool,
|
||||
expect_coordinator: bool,
|
||||
signature_opt: Option<ValidatorSignature>,
|
||||
) -> oneshot::Receiver<ApprovalCheckResult> {
|
||||
let signature = signature_opt.unwrap_or(sign_approval(
|
||||
@@ -604,18 +603,6 @@ async fn check_and_import_approval(
|
||||
}
|
||||
);
|
||||
}
|
||||
if expect_coordinator {
|
||||
assert_matches!(
|
||||
overseer_recv(overseer).await,
|
||||
AllMessages::DisputeCoordinator(DisputeCoordinatorMessage::ImportStatements {
|
||||
candidate_hash: c_hash,
|
||||
pending_confirmation: None,
|
||||
..
|
||||
}) => {
|
||||
assert_eq!(c_hash, candidate_hash);
|
||||
}
|
||||
);
|
||||
}
|
||||
rx
|
||||
}
|
||||
|
||||
@@ -1160,7 +1147,6 @@ fn subsystem_rejects_approval_if_no_candidate_entry() {
|
||||
candidate_hash,
|
||||
session_index,
|
||||
false,
|
||||
false,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
@@ -1202,7 +1188,6 @@ fn subsystem_rejects_approval_if_no_block_entry() {
|
||||
candidate_hash,
|
||||
session_index,
|
||||
false,
|
||||
false,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
@@ -1263,7 +1248,6 @@ fn subsystem_rejects_approval_before_assignment() {
|
||||
candidate_hash,
|
||||
session_index,
|
||||
false,
|
||||
false,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
@@ -1488,7 +1472,6 @@ fn subsystem_accepts_and_imports_approval_after_assignment() {
|
||||
candidate_hash,
|
||||
session_index,
|
||||
true,
|
||||
true,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
@@ -1582,7 +1565,6 @@ fn subsystem_second_approval_import_only_schedules_wakeups() {
|
||||
candidate_hash,
|
||||
session_index,
|
||||
false,
|
||||
true,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
@@ -1600,7 +1582,6 @@ fn subsystem_second_approval_import_only_schedules_wakeups() {
|
||||
candidate_hash,
|
||||
session_index,
|
||||
false,
|
||||
false,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
@@ -1936,7 +1917,6 @@ fn import_checked_approval_updates_entries_and_schedules() {
|
||||
candidate_hash,
|
||||
session_index,
|
||||
false,
|
||||
true,
|
||||
Some(sig_a),
|
||||
)
|
||||
.await;
|
||||
@@ -1964,7 +1944,6 @@ fn import_checked_approval_updates_entries_and_schedules() {
|
||||
candidate_hash,
|
||||
session_index,
|
||||
true,
|
||||
true,
|
||||
Some(sig_b),
|
||||
)
|
||||
.await;
|
||||
@@ -2104,7 +2083,6 @@ fn subsystem_import_checked_approval_sets_one_block_bit_at_a_time() {
|
||||
*candidate_hash,
|
||||
session_index,
|
||||
expect_block_approved,
|
||||
true,
|
||||
Some(signature),
|
||||
)
|
||||
.await;
|
||||
@@ -2208,7 +2186,6 @@ fn approved_ancestor_test(
|
||||
candidate_hash,
|
||||
i as u32 + 1,
|
||||
true,
|
||||
true,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
@@ -2586,7 +2563,6 @@ where
|
||||
candidate_hash,
|
||||
1,
|
||||
expect_chain_approved,
|
||||
true,
|
||||
Some(sign_approval(
|
||||
validators[validator_index as usize].clone(),
|
||||
candidate_hash,
|
||||
@@ -2930,7 +2906,6 @@ fn pre_covers_dont_stall_approval() {
|
||||
candidate_hash,
|
||||
session_index,
|
||||
false,
|
||||
true,
|
||||
Some(sig_b),
|
||||
)
|
||||
.await;
|
||||
@@ -2946,7 +2921,6 @@ fn pre_covers_dont_stall_approval() {
|
||||
candidate_hash,
|
||||
session_index,
|
||||
false,
|
||||
true,
|
||||
Some(sig_c),
|
||||
)
|
||||
.await;
|
||||
@@ -3101,7 +3075,6 @@ fn waits_until_approving_assignments_are_old_enough() {
|
||||
candidate_hash,
|
||||
session_index,
|
||||
false,
|
||||
true,
|
||||
Some(sig_a),
|
||||
)
|
||||
.await;
|
||||
@@ -3118,7 +3091,6 @@ fn waits_until_approving_assignments_are_old_enough() {
|
||||
candidate_hash,
|
||||
session_index,
|
||||
false,
|
||||
true,
|
||||
Some(sig_b),
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -31,15 +31,15 @@ use futures::{
|
||||
|
||||
use error::{Error, FatalResult};
|
||||
use polkadot_node_primitives::{
|
||||
AvailableData, InvalidCandidate, PoV, SignedDisputeStatement, SignedFullStatement, Statement,
|
||||
ValidationResult, BACKING_EXECUTION_TIMEOUT,
|
||||
AvailableData, InvalidCandidate, PoV, SignedFullStatement, Statement, ValidationResult,
|
||||
BACKING_EXECUTION_TIMEOUT,
|
||||
};
|
||||
use polkadot_node_subsystem::{
|
||||
jaeger,
|
||||
messages::{
|
||||
AvailabilityDistributionMessage, AvailabilityStoreMessage, CandidateBackingMessage,
|
||||
CandidateValidationMessage, CollatorProtocolMessage, DisputeCoordinatorMessage,
|
||||
ProvisionableData, ProvisionerMessage, RuntimeApiRequest, StatementDistributionMessage,
|
||||
CandidateValidationMessage, CollatorProtocolMessage, ProvisionableData, ProvisionerMessage,
|
||||
RuntimeApiRequest, StatementDistributionMessage,
|
||||
},
|
||||
overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, PerLeafSpan, SpawnedSubsystem,
|
||||
Stage, SubsystemError,
|
||||
@@ -50,8 +50,8 @@ use polkadot_node_subsystem_util::{
|
||||
};
|
||||
use polkadot_primitives::v2::{
|
||||
BackedCandidate, CandidateCommitments, CandidateHash, CandidateReceipt, CollatorId,
|
||||
CommittedCandidateReceipt, CoreIndex, CoreState, Hash, Id as ParaId, SessionIndex,
|
||||
SigningContext, ValidatorId, ValidatorIndex, ValidatorSignature, ValidityAttestation,
|
||||
CommittedCandidateReceipt, CoreIndex, CoreState, Hash, Id as ParaId, SigningContext,
|
||||
ValidatorId, ValidatorIndex, ValidatorSignature, ValidityAttestation,
|
||||
};
|
||||
use sp_keystore::SyncCryptoStorePtr;
|
||||
use statement_table::{
|
||||
@@ -380,7 +380,6 @@ async fn handle_active_leaves_update<Context>(
|
||||
|
||||
let job = CandidateBackingJob {
|
||||
parent,
|
||||
session_index,
|
||||
assignment,
|
||||
required_collator,
|
||||
issued_statements: HashSet::new(),
|
||||
@@ -411,8 +410,6 @@ struct JobAndSpan<Context> {
|
||||
struct CandidateBackingJob<Context> {
|
||||
/// The hash of the relay parent on top of which this job is doing it's work.
|
||||
parent: Hash,
|
||||
/// The session index this corresponds to.
|
||||
session_index: SessionIndex,
|
||||
/// The `ParaId` assigned to this validator
|
||||
assignment: Option<ParaId>,
|
||||
/// The collator required to author the candidate, if any.
|
||||
@@ -783,8 +780,6 @@ async fn validate_and_make_available(
|
||||
tx_command.send((relay_parent, make_command(res))).await.map_err(Into::into)
|
||||
}
|
||||
|
||||
struct ValidatorIndexOutOfBounds;
|
||||
|
||||
#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
|
||||
impl<Context> CandidateBackingJob<Context> {
|
||||
async fn handle_validated_candidate_command(
|
||||
@@ -1014,21 +1009,6 @@ impl<Context> CandidateBackingJob<Context> {
|
||||
)
|
||||
};
|
||||
|
||||
if let Err(ValidatorIndexOutOfBounds) = self
|
||||
.dispatch_new_statement_to_dispute_coordinator(ctx.sender(), candidate_hash, &statement)
|
||||
.await
|
||||
{
|
||||
gum::warn!(
|
||||
target: LOG_TARGET,
|
||||
session_index = ?self.session_index,
|
||||
relay_parent = ?self.parent,
|
||||
validator_index = statement.validator_index().0,
|
||||
"Supposedly 'Signed' statement has validator index out of bounds."
|
||||
);
|
||||
|
||||
return Ok(None)
|
||||
}
|
||||
|
||||
let stmt = primitive_statement_to_table(statement);
|
||||
|
||||
let summary = self.table.import_statement(&self.table_context, stmt);
|
||||
@@ -1083,67 +1063,6 @@ impl<Context> CandidateBackingJob<Context> {
|
||||
Ok(summary)
|
||||
}
|
||||
|
||||
/// The dispute coordinator keeps track of all statements by validators about every recent
|
||||
/// candidate.
|
||||
///
|
||||
/// When importing a statement, this should be called access the candidate receipt either
|
||||
/// from the statement itself or from the underlying statement table in order to craft
|
||||
/// and dispatch the notification to the dispute coordinator.
|
||||
///
|
||||
/// This also does bounds-checking on the validator index and will return an error if the
|
||||
/// validator index is out of bounds for the current validator set. It's expected that
|
||||
/// this should never happen due to the interface of the candidate backing subsystem -
|
||||
/// the networking component responsible for feeding statements to the backing subsystem
|
||||
/// is meant to check the signature and provenance of all statements before submission.
|
||||
async fn dispatch_new_statement_to_dispute_coordinator(
|
||||
&self,
|
||||
sender: &mut impl overseer::CandidateBackingSenderTrait,
|
||||
candidate_hash: CandidateHash,
|
||||
statement: &SignedFullStatement,
|
||||
) -> Result<(), ValidatorIndexOutOfBounds> {
|
||||
// Dispatch the statement to the dispute coordinator.
|
||||
let validator_index = statement.validator_index();
|
||||
let signing_context =
|
||||
SigningContext { parent_hash: self.parent, session_index: self.session_index };
|
||||
|
||||
let validator_public = match self.table_context.validators.get(validator_index.0 as usize) {
|
||||
None => return Err(ValidatorIndexOutOfBounds),
|
||||
Some(v) => v,
|
||||
};
|
||||
|
||||
let maybe_candidate_receipt = match statement.payload() {
|
||||
Statement::Seconded(receipt) => Some(receipt.to_plain()),
|
||||
Statement::Valid(candidate_hash) => {
|
||||
// Valid statements are only supposed to be imported
|
||||
// once we've seen at least one `Seconded` statement.
|
||||
self.table.get_candidate(&candidate_hash).map(|c| c.to_plain())
|
||||
},
|
||||
};
|
||||
|
||||
let maybe_signed_dispute_statement = SignedDisputeStatement::from_backing_statement(
|
||||
statement.as_unchecked(),
|
||||
signing_context,
|
||||
validator_public.clone(),
|
||||
)
|
||||
.ok();
|
||||
|
||||
if let (Some(candidate_receipt), Some(dispute_statement)) =
|
||||
(maybe_candidate_receipt, maybe_signed_dispute_statement)
|
||||
{
|
||||
sender
|
||||
.send_message(DisputeCoordinatorMessage::ImportStatements {
|
||||
candidate_hash,
|
||||
candidate_receipt,
|
||||
session: self.session_index,
|
||||
statements: vec![(dispute_statement, validator_index)],
|
||||
pending_confirmation: None,
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_second_msg(
|
||||
&mut self,
|
||||
root_span: &jaeger::Span,
|
||||
|
||||
@@ -66,12 +66,6 @@ struct TestState {
|
||||
relay_parent: Hash,
|
||||
}
|
||||
|
||||
impl TestState {
|
||||
fn session(&self) -> SessionIndex {
|
||||
self.signing_context.session_index
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for TestState {
|
||||
fn default() -> Self {
|
||||
let chain_a = ParaId::from(1);
|
||||
@@ -273,34 +267,6 @@ async fn test_startup(virtual_overseer: &mut VirtualOverseer, test_state: &TestS
|
||||
);
|
||||
}
|
||||
|
||||
async fn test_dispute_coordinator_notifications(
|
||||
virtual_overseer: &mut VirtualOverseer,
|
||||
candidate_hash: CandidateHash,
|
||||
session: SessionIndex,
|
||||
validator_indices: Vec<ValidatorIndex>,
|
||||
) {
|
||||
for validator_index in validator_indices {
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::DisputeCoordinator(
|
||||
DisputeCoordinatorMessage::ImportStatements {
|
||||
candidate_hash: c_hash,
|
||||
candidate_receipt: c_receipt,
|
||||
session: s,
|
||||
statements,
|
||||
pending_confirmation: None,
|
||||
}
|
||||
) => {
|
||||
assert_eq!(c_hash, candidate_hash);
|
||||
assert_eq!(c_receipt.hash(), c_hash);
|
||||
assert_eq!(s, session);
|
||||
assert_eq!(statements.len(), 1);
|
||||
assert_eq!(statements[0].1, validator_index);
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// Test that a `CandidateBackingMessage::Second` issues validation work
|
||||
// and in case validation is successful issues a `StatementDistributionMessage`.
|
||||
#[test]
|
||||
@@ -364,14 +330,6 @@ fn backing_second_works() {
|
||||
}
|
||||
);
|
||||
|
||||
test_dispute_coordinator_notifications(
|
||||
&mut virtual_overseer,
|
||||
candidate.hash(),
|
||||
test_state.session(),
|
||||
vec![ValidatorIndex(0)],
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::StatementDistribution(
|
||||
@@ -469,14 +427,6 @@ fn backing_works() {
|
||||
|
||||
virtual_overseer.send(FromOrchestra::Communication { msg: statement }).await;
|
||||
|
||||
test_dispute_coordinator_notifications(
|
||||
&mut virtual_overseer,
|
||||
candidate_a_hash,
|
||||
test_state.session(),
|
||||
vec![ValidatorIndex(2)],
|
||||
)
|
||||
.await;
|
||||
|
||||
// Sending a `Statement::Seconded` for our assignment will start
|
||||
// validation process. The first thing requested is the PoV.
|
||||
assert_matches!(
|
||||
@@ -526,14 +476,6 @@ fn backing_works() {
|
||||
}
|
||||
);
|
||||
|
||||
test_dispute_coordinator_notifications(
|
||||
&mut virtual_overseer,
|
||||
candidate_a_hash,
|
||||
test_state.session(),
|
||||
vec![ValidatorIndex(0)],
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::Provisioner(
|
||||
@@ -560,14 +502,6 @@ fn backing_works() {
|
||||
|
||||
virtual_overseer.send(FromOrchestra::Communication { msg: statement }).await;
|
||||
|
||||
test_dispute_coordinator_notifications(
|
||||
&mut virtual_overseer,
|
||||
candidate_a_hash,
|
||||
test_state.session(),
|
||||
vec![ValidatorIndex(5)],
|
||||
)
|
||||
.await;
|
||||
|
||||
virtual_overseer
|
||||
.send(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(
|
||||
ActiveLeavesUpdate::stop_work(test_state.relay_parent),
|
||||
@@ -664,14 +598,6 @@ fn backing_works_while_validation_ongoing() {
|
||||
CandidateBackingMessage::Statement(test_state.relay_parent, signed_a.clone());
|
||||
virtual_overseer.send(FromOrchestra::Communication { msg: statement }).await;
|
||||
|
||||
test_dispute_coordinator_notifications(
|
||||
&mut virtual_overseer,
|
||||
candidate_a.hash(),
|
||||
test_state.session(),
|
||||
vec![ValidatorIndex(2)],
|
||||
)
|
||||
.await;
|
||||
|
||||
// Sending a `Statement::Seconded` for our assignment will start
|
||||
// validation process. The first thing requested is PoV from the
|
||||
// `PoVDistribution`.
|
||||
@@ -711,14 +637,6 @@ fn backing_works_while_validation_ongoing() {
|
||||
|
||||
virtual_overseer.send(FromOrchestra::Communication { msg: statement }).await;
|
||||
|
||||
test_dispute_coordinator_notifications(
|
||||
&mut virtual_overseer,
|
||||
candidate_a.hash(),
|
||||
test_state.session(),
|
||||
vec![ValidatorIndex(5)],
|
||||
)
|
||||
.await;
|
||||
|
||||
// Candidate gets backed entirely by other votes.
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
@@ -738,14 +656,6 @@ fn backing_works_while_validation_ongoing() {
|
||||
|
||||
virtual_overseer.send(FromOrchestra::Communication { msg: statement }).await;
|
||||
|
||||
test_dispute_coordinator_notifications(
|
||||
&mut virtual_overseer,
|
||||
candidate_a.hash(),
|
||||
test_state.session(),
|
||||
vec![ValidatorIndex(3)],
|
||||
)
|
||||
.await;
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let msg = CandidateBackingMessage::GetBackedCandidates(
|
||||
test_state.relay_parent,
|
||||
@@ -845,14 +755,6 @@ fn backing_misbehavior_works() {
|
||||
|
||||
virtual_overseer.send(FromOrchestra::Communication { msg: statement }).await;
|
||||
|
||||
test_dispute_coordinator_notifications(
|
||||
&mut virtual_overseer,
|
||||
candidate_a_hash,
|
||||
test_state.session(),
|
||||
vec![ValidatorIndex(2)],
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::AvailabilityDistribution(
|
||||
@@ -898,14 +800,6 @@ fn backing_misbehavior_works() {
|
||||
}
|
||||
);
|
||||
|
||||
test_dispute_coordinator_notifications(
|
||||
&mut virtual_overseer,
|
||||
candidate_a_hash,
|
||||
test_state.session(),
|
||||
vec![ValidatorIndex(0)],
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::Provisioner(
|
||||
@@ -937,14 +831,6 @@ fn backing_misbehavior_works() {
|
||||
|
||||
virtual_overseer.send(FromOrchestra::Communication { msg: statement }).await;
|
||||
|
||||
test_dispute_coordinator_notifications(
|
||||
&mut virtual_overseer,
|
||||
candidate_a_hash,
|
||||
test_state.session(),
|
||||
vec![ValidatorIndex(2)],
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::Provisioner(
|
||||
@@ -1087,14 +973,6 @@ fn backing_dont_second_invalid() {
|
||||
}
|
||||
);
|
||||
|
||||
test_dispute_coordinator_notifications(
|
||||
&mut virtual_overseer,
|
||||
candidate_b.hash(),
|
||||
test_state.session(),
|
||||
vec![ValidatorIndex(0)],
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::StatementDistribution(
|
||||
@@ -1163,14 +1041,6 @@ fn backing_second_after_first_fails_works() {
|
||||
|
||||
virtual_overseer.send(FromOrchestra::Communication { msg: statement }).await;
|
||||
|
||||
test_dispute_coordinator_notifications(
|
||||
&mut virtual_overseer,
|
||||
candidate.hash(),
|
||||
test_state.session(),
|
||||
vec![ValidatorIndex(2)],
|
||||
)
|
||||
.await;
|
||||
|
||||
// Subsystem requests PoV and requests validation.
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
@@ -1297,14 +1167,6 @@ fn backing_works_after_failed_validation() {
|
||||
|
||||
virtual_overseer.send(FromOrchestra::Communication { msg: statement }).await;
|
||||
|
||||
test_dispute_coordinator_notifications(
|
||||
&mut virtual_overseer,
|
||||
candidate.hash(),
|
||||
test_state.session(),
|
||||
vec![ValidatorIndex(2)],
|
||||
)
|
||||
.await;
|
||||
|
||||
// Subsystem requests PoV and requests validation.
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
@@ -1615,14 +1477,6 @@ fn retry_works() {
|
||||
CandidateBackingMessage::Statement(test_state.relay_parent, signed_a.clone());
|
||||
virtual_overseer.send(FromOrchestra::Communication { msg: statement }).await;
|
||||
|
||||
test_dispute_coordinator_notifications(
|
||||
&mut virtual_overseer,
|
||||
candidate.hash(),
|
||||
test_state.session(),
|
||||
vec![ValidatorIndex(2)],
|
||||
)
|
||||
.await;
|
||||
|
||||
// Subsystem requests PoV and requests validation.
|
||||
// We cancel - should mean retry on next backing statement.
|
||||
assert_matches!(
|
||||
@@ -1642,14 +1496,6 @@ fn retry_works() {
|
||||
CandidateBackingMessage::Statement(test_state.relay_parent, signed_b.clone());
|
||||
virtual_overseer.send(FromOrchestra::Communication { msg: statement }).await;
|
||||
|
||||
test_dispute_coordinator_notifications(
|
||||
&mut virtual_overseer,
|
||||
candidate.hash(),
|
||||
test_state.session(),
|
||||
vec![ValidatorIndex(3)],
|
||||
)
|
||||
.await;
|
||||
|
||||
// Not deterministic which message comes first:
|
||||
for _ in 0u32..2 {
|
||||
match virtual_overseer.recv().await {
|
||||
@@ -1674,14 +1520,6 @@ fn retry_works() {
|
||||
CandidateBackingMessage::Statement(test_state.relay_parent, signed_c.clone());
|
||||
virtual_overseer.send(FromOrchestra::Communication { msg: statement }).await;
|
||||
|
||||
test_dispute_coordinator_notifications(
|
||||
&mut virtual_overseer,
|
||||
candidate.hash(),
|
||||
test_state.session(),
|
||||
vec![ValidatorIndex(5)],
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::AvailabilityDistribution(
|
||||
@@ -1806,14 +1644,6 @@ fn observes_backing_even_if_not_validator() {
|
||||
|
||||
virtual_overseer.send(FromOrchestra::Communication { msg: statement }).await;
|
||||
|
||||
test_dispute_coordinator_notifications(
|
||||
&mut virtual_overseer,
|
||||
candidate_a_hash,
|
||||
test_state.session(),
|
||||
vec![ValidatorIndex(0), ValidatorIndex(5)],
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::Provisioner(
|
||||
@@ -1831,14 +1661,6 @@ fn observes_backing_even_if_not_validator() {
|
||||
|
||||
virtual_overseer.send(FromOrchestra::Communication { msg: statement }).await;
|
||||
|
||||
test_dispute_coordinator_notifications(
|
||||
&mut virtual_overseer,
|
||||
candidate_a_hash,
|
||||
test_state.session(),
|
||||
vec![ValidatorIndex(2)],
|
||||
)
|
||||
.await;
|
||||
|
||||
virtual_overseer
|
||||
.send(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(
|
||||
ActiveLeavesUpdate::stop_work(test_state.relay_parent),
|
||||
|
||||
@@ -30,6 +30,7 @@ sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "maste
|
||||
assert_matches = "1.4.0"
|
||||
test-helpers = { package = "polkadot-primitives-test-helpers", path = "../../../primitives/test-helpers" }
|
||||
futures-timer = "3.0.2"
|
||||
sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
|
||||
[features]
|
||||
# If not enabled, the dispute coordinator will do nothing.
|
||||
|
||||
@@ -213,8 +213,8 @@ impl From<CandidateVotes> for polkadot_node_primitives::CandidateVotes {
|
||||
fn from(db_votes: CandidateVotes) -> polkadot_node_primitives::CandidateVotes {
|
||||
polkadot_node_primitives::CandidateVotes {
|
||||
candidate_receipt: db_votes.candidate_receipt,
|
||||
valid: db_votes.valid,
|
||||
invalid: db_votes.invalid,
|
||||
valid: db_votes.valid.into_iter().map(|(kind, i, sig)| (i, (kind, sig))).collect(),
|
||||
invalid: db_votes.invalid.into_iter().map(|(kind, i, sig)| (i, (kind, sig))).collect(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -223,8 +223,12 @@ impl From<polkadot_node_primitives::CandidateVotes> for CandidateVotes {
|
||||
fn from(primitive_votes: polkadot_node_primitives::CandidateVotes) -> CandidateVotes {
|
||||
CandidateVotes {
|
||||
candidate_receipt: primitive_votes.candidate_receipt,
|
||||
valid: primitive_votes.valid,
|
||||
invalid: primitive_votes.invalid,
|
||||
valid: primitive_votes
|
||||
.valid
|
||||
.into_iter()
|
||||
.map(|(i, (kind, sig))| (kind, i, sig))
|
||||
.collect(),
|
||||
invalid: primitive_votes.invalid.into_iter().map(|(i, (k, sig))| (k, i, sig)).collect(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -122,7 +122,7 @@ impl JfyiError {
|
||||
pub fn log(self) {
|
||||
match self {
|
||||
// don't spam the log with spurious errors
|
||||
Self::Runtime(_) | Self::Oneshot(_) => {
|
||||
Self::Runtime(runtime::Error::RuntimeRequestCanceled(_)) | Self::Oneshot(_) => {
|
||||
gum::debug!(target: LOG_TARGET, error = ?self)
|
||||
},
|
||||
// it's worth reporting otherwise
|
||||
|
||||
@@ -0,0 +1,564 @@
|
||||
// Copyright 2022 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Polkadot.
|
||||
|
||||
// Polkadot is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Polkadot is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//! Vote import logic.
|
||||
//!
|
||||
//! This module encapsulates the actual logic for importing new votes and provides easy access of
|
||||
//! the current state for votes for a particular candidate.
|
||||
//!
|
||||
//! In particular there is `CandidateVoteState` which tells what can be concluded for a particular set of
|
||||
//! votes. E.g. whether a dispute is ongoing, whether it is confirmed, concluded, ..
|
||||
//!
|
||||
//! Then there is `ImportResult` which reveals information about what changed once additional votes
|
||||
//! got imported on top of an existing `CandidateVoteState` and reveals "dynamic" information, like whether
|
||||
//! due to the import a dispute was raised/got confirmed, ...
|
||||
|
||||
use std::collections::{BTreeMap, HashMap, HashSet};
|
||||
|
||||
use polkadot_node_primitives::{CandidateVotes, SignedDisputeStatement};
|
||||
use polkadot_node_subsystem_util::rolling_session_window::RollingSessionWindow;
|
||||
use polkadot_primitives::v2::{
|
||||
CandidateReceipt, DisputeStatement, SessionIndex, SessionInfo, ValidDisputeStatementKind,
|
||||
ValidatorId, ValidatorIndex, ValidatorPair, ValidatorSignature,
|
||||
};
|
||||
use sc_keystore::LocalKeystore;
|
||||
|
||||
use crate::LOG_TARGET;
|
||||
|
||||
/// (Session) environment of a candidate.
|
||||
pub struct CandidateEnvironment<'a> {
|
||||
/// The session the candidate appeared in.
|
||||
session_index: SessionIndex,
|
||||
/// Session for above index.
|
||||
session: &'a SessionInfo,
|
||||
/// Validator indices controlled by this node.
|
||||
controlled_indices: HashSet<ValidatorIndex>,
|
||||
}
|
||||
|
||||
impl<'a> CandidateEnvironment<'a> {
|
||||
/// Create `CandidateEnvironment`.
|
||||
///
|
||||
/// Return: `None` in case session is outside of session window.
|
||||
pub fn new(
|
||||
keystore: &LocalKeystore,
|
||||
session_window: &'a RollingSessionWindow,
|
||||
session_index: SessionIndex,
|
||||
) -> Option<Self> {
|
||||
let session = session_window.session_info(session_index)?;
|
||||
let controlled_indices = find_controlled_validator_indices(keystore, &session.validators);
|
||||
Some(Self { session_index, session, controlled_indices })
|
||||
}
|
||||
|
||||
/// Validators in the candidate's session.
|
||||
pub fn validators(&self) -> &Vec<ValidatorId> {
|
||||
&self.session.validators
|
||||
}
|
||||
|
||||
/// `SessionInfo` for the candidate's session.
|
||||
pub fn session_info(&self) -> &SessionInfo {
|
||||
&self.session
|
||||
}
|
||||
|
||||
/// Retrieve `SessionIndex` for this environment.
|
||||
pub fn session_index(&self) -> SessionIndex {
|
||||
self.session_index
|
||||
}
|
||||
|
||||
/// Indices controlled by this node.
|
||||
pub fn controlled_indices(&'a self) -> &'a HashSet<ValidatorIndex> {
|
||||
&self.controlled_indices
|
||||
}
|
||||
}
|
||||
|
||||
/// Whether or not we already issued some statement about a candidate.
|
||||
pub enum OwnVoteState {
|
||||
/// We already voted/issued a statement for the candidate.
|
||||
Voted,
|
||||
/// We already voted/issued a statement for the candidate and it was an approval vote.
|
||||
///
|
||||
/// Needs special treatment as we have to make sure to propagate it to peers, to guarantee the
|
||||
/// dispute can conclude.
|
||||
VotedApproval(Vec<(ValidatorIndex, ValidatorSignature)>),
|
||||
/// We not yet voted for the dispute.
|
||||
NoVote,
|
||||
}
|
||||
|
||||
impl OwnVoteState {
|
||||
fn new<'a>(votes: &CandidateVotes, env: &CandidateEnvironment<'a>) -> Self {
|
||||
let mut our_valid_votes = env
|
||||
.controlled_indices()
|
||||
.iter()
|
||||
.filter_map(|i| votes.valid.get_key_value(i))
|
||||
.peekable();
|
||||
let mut our_invalid_votes =
|
||||
env.controlled_indices.iter().filter_map(|i| votes.invalid.get_key_value(i));
|
||||
let has_valid_votes = our_valid_votes.peek().is_some();
|
||||
let has_invalid_votes = our_invalid_votes.next().is_some();
|
||||
let our_approval_votes: Vec<_> = our_valid_votes
|
||||
.filter_map(|(index, (k, sig))| {
|
||||
if let ValidDisputeStatementKind::ApprovalChecking = k {
|
||||
Some((*index, sig.clone()))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
if !our_approval_votes.is_empty() {
|
||||
return Self::VotedApproval(our_approval_votes)
|
||||
}
|
||||
if has_valid_votes || has_invalid_votes {
|
||||
return Self::Voted
|
||||
}
|
||||
Self::NoVote
|
||||
}
|
||||
|
||||
/// Whether or not we issued a statement for the candidate already.
|
||||
fn voted(&self) -> bool {
|
||||
match self {
|
||||
Self::Voted | Self::VotedApproval(_) => true,
|
||||
Self::NoVote => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Get own approval votes, if any.
|
||||
fn approval_votes(&self) -> Option<&Vec<(ValidatorIndex, ValidatorSignature)>> {
|
||||
match self {
|
||||
Self::VotedApproval(votes) => Some(&votes),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Complete state of votes for a candidate.
|
||||
///
|
||||
/// All votes + information whether a dispute is ongoing, confirmed, concluded, whether we already
|
||||
/// voted, ...
|
||||
pub struct CandidateVoteState<Votes> {
|
||||
/// Votes already existing for the candidate + receipt.
|
||||
votes: Votes,
|
||||
|
||||
/// Information about own votes:
|
||||
own_vote: OwnVoteState,
|
||||
|
||||
/// Whether or not the dispute concluded invalid.
|
||||
concluded_invalid: bool,
|
||||
|
||||
/// Whether or not the dispute concluded valid.
|
||||
///
|
||||
/// Note: Due to equivocations it is technically possible for a dispute to conclude both valid
|
||||
/// and invalid. In that case the invalid result takes precedence.
|
||||
concluded_valid: bool,
|
||||
|
||||
/// There is an ongoing dispute and we reached f+1 votes -> the dispute is confirmed
|
||||
///
|
||||
/// as at least one honest validator cast a vote for the candidate.
|
||||
is_confirmed: bool,
|
||||
|
||||
/// Whether or not we have an ongoing dispute.
|
||||
is_disputed: bool,
|
||||
}
|
||||
|
||||
impl CandidateVoteState<CandidateVotes> {
|
||||
/// Create an empty `CandidateVoteState`
|
||||
///
|
||||
/// in case there have not been any previous votes.
|
||||
pub fn new_from_receipt(candidate_receipt: CandidateReceipt) -> Self {
|
||||
let votes =
|
||||
CandidateVotes { candidate_receipt, valid: BTreeMap::new(), invalid: BTreeMap::new() };
|
||||
Self {
|
||||
votes,
|
||||
own_vote: OwnVoteState::NoVote,
|
||||
concluded_invalid: false,
|
||||
concluded_valid: false,
|
||||
is_confirmed: false,
|
||||
is_disputed: false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new `CandidateVoteState` from already existing votes.
|
||||
pub fn new<'a>(votes: CandidateVotes, env: &CandidateEnvironment<'a>) -> Self {
|
||||
let own_vote = OwnVoteState::new(&votes, env);
|
||||
|
||||
let n_validators = env.validators().len();
|
||||
|
||||
let supermajority_threshold =
|
||||
polkadot_primitives::v2::supermajority_threshold(n_validators);
|
||||
|
||||
let concluded_invalid = votes.invalid.len() >= supermajority_threshold;
|
||||
let concluded_valid = votes.valid.len() >= supermajority_threshold;
|
||||
|
||||
// We have a dispute, if we have votes on both sides:
|
||||
let is_disputed = !votes.invalid.is_empty() && !votes.valid.is_empty();
|
||||
|
||||
let byzantine_threshold = polkadot_primitives::v2::byzantine_threshold(n_validators);
|
||||
let is_confirmed = votes.voted_indices().len() > byzantine_threshold && is_disputed;
|
||||
|
||||
Self { votes, own_vote, concluded_invalid, concluded_valid, is_confirmed, is_disputed }
|
||||
}
|
||||
|
||||
/// Import fresh statements.
|
||||
///
|
||||
/// Result will be a new state plus information about things that changed due to the import.
|
||||
pub fn import_statements(
|
||||
self,
|
||||
env: &CandidateEnvironment,
|
||||
statements: Vec<(SignedDisputeStatement, ValidatorIndex)>,
|
||||
) -> ImportResult {
|
||||
let (mut votes, old_state) = self.into_old_state();
|
||||
|
||||
let mut new_invalid_voters = Vec::new();
|
||||
let mut imported_invalid_votes = 0;
|
||||
let mut imported_valid_votes = 0;
|
||||
|
||||
let expected_candidate_hash = votes.candidate_receipt.hash();
|
||||
|
||||
for (statement, val_index) in statements {
|
||||
if env
|
||||
.validators()
|
||||
.get(val_index.0 as usize)
|
||||
.map_or(true, |v| v != statement.validator_public())
|
||||
{
|
||||
gum::error!(
|
||||
target: LOG_TARGET,
|
||||
?val_index,
|
||||
session= ?env.session_index,
|
||||
claimed_key = ?statement.validator_public(),
|
||||
"Validator index doesn't match claimed key",
|
||||
);
|
||||
|
||||
continue
|
||||
}
|
||||
if statement.candidate_hash() != &expected_candidate_hash {
|
||||
gum::error!(
|
||||
target: LOG_TARGET,
|
||||
?val_index,
|
||||
session= ?env.session_index,
|
||||
given_candidate_hash = ?statement.candidate_hash(),
|
||||
?expected_candidate_hash,
|
||||
"Vote is for unexpected candidate!",
|
||||
);
|
||||
continue
|
||||
}
|
||||
if statement.session_index() != env.session_index() {
|
||||
gum::error!(
|
||||
target: LOG_TARGET,
|
||||
?val_index,
|
||||
session= ?env.session_index,
|
||||
given_candidate_hash = ?statement.candidate_hash(),
|
||||
?expected_candidate_hash,
|
||||
"Vote is for unexpected session!",
|
||||
);
|
||||
continue
|
||||
}
|
||||
|
||||
match statement.statement() {
|
||||
DisputeStatement::Valid(valid_kind) => {
|
||||
let fresh = insert_into_statements(
|
||||
&mut votes.valid,
|
||||
*valid_kind,
|
||||
val_index,
|
||||
statement.into_validator_signature(),
|
||||
);
|
||||
|
||||
if fresh {
|
||||
imported_valid_votes += 1;
|
||||
}
|
||||
},
|
||||
DisputeStatement::Invalid(invalid_kind) => {
|
||||
let fresh = insert_into_statements(
|
||||
&mut votes.invalid,
|
||||
*invalid_kind,
|
||||
val_index,
|
||||
statement.into_validator_signature(),
|
||||
);
|
||||
|
||||
if fresh {
|
||||
new_invalid_voters.push(val_index);
|
||||
imported_invalid_votes += 1;
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
let new_state = Self::new(votes, env);
|
||||
|
||||
ImportResult {
|
||||
old_state,
|
||||
new_state,
|
||||
imported_invalid_votes,
|
||||
imported_valid_votes,
|
||||
imported_approval_votes: 0,
|
||||
new_invalid_voters,
|
||||
}
|
||||
}
|
||||
|
||||
/// Retrieve `CandidateReceipt` in `CandidateVotes`.
|
||||
pub fn candidate_receipt(&self) -> &CandidateReceipt {
|
||||
&self.votes.candidate_receipt
|
||||
}
|
||||
|
||||
/// Extract `CandidateVotes` for handling import of new statements.
|
||||
fn into_old_state(self) -> (CandidateVotes, CandidateVoteState<()>) {
|
||||
let CandidateVoteState {
|
||||
votes,
|
||||
own_vote,
|
||||
concluded_invalid,
|
||||
concluded_valid,
|
||||
is_confirmed,
|
||||
is_disputed,
|
||||
} = self;
|
||||
(
|
||||
votes,
|
||||
CandidateVoteState {
|
||||
votes: (),
|
||||
own_vote,
|
||||
concluded_invalid,
|
||||
concluded_valid,
|
||||
is_confirmed,
|
||||
is_disputed,
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl<V> CandidateVoteState<V> {
|
||||
/// Whether or not we have an ongoing dispute.
|
||||
pub fn is_disputed(&self) -> bool {
|
||||
self.is_disputed
|
||||
}
|
||||
|
||||
/// Whether there is an ongoing confirmed dispute.
|
||||
///
|
||||
/// This checks whether there is a dispute ongoing and we have more than byzantine threshold
|
||||
/// votes.
|
||||
pub fn is_confirmed(&self) -> bool {
|
||||
self.is_confirmed
|
||||
}
|
||||
|
||||
/// This machine already cast some vote in that dispute/for that candidate.
|
||||
pub fn has_own_vote(&self) -> bool {
|
||||
self.own_vote.voted()
|
||||
}
|
||||
|
||||
/// Own approval votes if any:
|
||||
pub fn own_approval_votes(&self) -> Option<&Vec<(ValidatorIndex, ValidatorSignature)>> {
|
||||
self.own_vote.approval_votes()
|
||||
}
|
||||
|
||||
/// Whether or not this dispute has already enough valid votes to conclude.
|
||||
pub fn is_concluded_valid(&self) -> bool {
|
||||
self.concluded_valid
|
||||
}
|
||||
|
||||
/// Whether or not this dispute has already enough invalid votes to conclude.
|
||||
pub fn is_concluded_invalid(&self) -> bool {
|
||||
self.concluded_invalid
|
||||
}
|
||||
|
||||
/// Access to underlying votes.
|
||||
pub fn votes(&self) -> &V {
|
||||
&self.votes
|
||||
}
|
||||
}
|
||||
|
||||
/// An ongoing statement/vote import.
|
||||
pub struct ImportResult {
|
||||
/// The state we had before importing new statements.
|
||||
old_state: CandidateVoteState<()>,
|
||||
/// The new state after importing the new statements.
|
||||
new_state: CandidateVoteState<CandidateVotes>,
|
||||
/// New invalid voters as of this import.
|
||||
new_invalid_voters: Vec<ValidatorIndex>,
|
||||
/// Number of successfully imported valid votes.
|
||||
imported_invalid_votes: u32,
|
||||
/// Number of successfully imported invalid votes.
|
||||
imported_valid_votes: u32,
|
||||
/// Number of approval votes imported via `import_approval_votes()`.
|
||||
///
|
||||
/// And only those: If normal import included approval votes, those are not counted here.
|
||||
///
|
||||
/// In other words, without a call `import_approval_votes()` this will always be 0.
|
||||
imported_approval_votes: u32,
|
||||
}
|
||||
|
||||
impl ImportResult {
|
||||
/// Whether or not anything has changed due to the import.
|
||||
pub fn votes_changed(&self) -> bool {
|
||||
self.imported_valid_votes != 0 || self.imported_invalid_votes != 0
|
||||
}
|
||||
|
||||
/// The dispute state has changed in some way.
|
||||
///
|
||||
/// - freshly disputed
|
||||
/// - freshly confirmed
|
||||
/// - freshly concluded (valid or invalid)
|
||||
pub fn dispute_state_changed(&self) -> bool {
|
||||
self.is_freshly_disputed() || self.is_freshly_confirmed() || self.is_freshly_concluded()
|
||||
}
|
||||
|
||||
/// State as it was before import.
|
||||
pub fn old_state(&self) -> &CandidateVoteState<()> {
|
||||
&self.old_state
|
||||
}
|
||||
|
||||
/// State after import
|
||||
pub fn new_state(&self) -> &CandidateVoteState<CandidateVotes> {
|
||||
&self.new_state
|
||||
}
|
||||
|
||||
/// New "invalid" voters encountered during import.
|
||||
pub fn new_invalid_voters(&self) -> &Vec<ValidatorIndex> {
|
||||
&self.new_invalid_voters
|
||||
}
|
||||
|
||||
/// Number of imported valid votes.
|
||||
pub fn imported_valid_votes(&self) -> u32 {
|
||||
self.imported_valid_votes
|
||||
}
|
||||
|
||||
/// Number of imported invalid votes.
|
||||
pub fn imported_invalid_votes(&self) -> u32 {
|
||||
self.imported_invalid_votes
|
||||
}
|
||||
|
||||
/// Number of imported approval votes.
|
||||
pub fn imported_approval_votes(&self) -> u32 {
|
||||
self.imported_approval_votes
|
||||
}
|
||||
|
||||
/// Whether we now have a dispute and did not prior to the import.
|
||||
pub fn is_freshly_disputed(&self) -> bool {
|
||||
!self.old_state().is_disputed() && self.new_state().is_disputed()
|
||||
}
|
||||
|
||||
/// Whether we just surpassed the byzantine threshold.
|
||||
pub fn is_freshly_confirmed(&self) -> bool {
|
||||
!self.old_state().is_confirmed() && self.new_state().is_confirmed()
|
||||
}
|
||||
|
||||
/// Whether or not any dispute just concluded valid due to the import.
|
||||
pub fn is_freshly_concluded_valid(&self) -> bool {
|
||||
!self.old_state().is_concluded_valid() && self.new_state().is_concluded_valid()
|
||||
}
|
||||
|
||||
/// Whether or not any dispute just concluded invalid due to the import.
|
||||
pub fn is_freshly_concluded_invalid(&self) -> bool {
|
||||
!self.old_state().is_concluded_invalid() && self.new_state().is_concluded_invalid()
|
||||
}
|
||||
|
||||
/// Whether or not any dispute just concluded either invalid or valid due to the import.
|
||||
pub fn is_freshly_concluded(&self) -> bool {
|
||||
self.is_freshly_concluded_invalid() || self.is_freshly_concluded_valid()
|
||||
}
|
||||
|
||||
/// Modify this `ImportResult`s, by importing additional approval votes.
|
||||
///
|
||||
/// Both results and `new_state` will be changed as if those approval votes had been in the
|
||||
/// original import.
|
||||
pub fn import_approval_votes(
|
||||
self,
|
||||
env: &CandidateEnvironment,
|
||||
approval_votes: HashMap<ValidatorIndex, ValidatorSignature>,
|
||||
) -> Self {
|
||||
let Self {
|
||||
old_state,
|
||||
new_state,
|
||||
new_invalid_voters,
|
||||
mut imported_valid_votes,
|
||||
imported_invalid_votes,
|
||||
mut imported_approval_votes,
|
||||
} = self;
|
||||
|
||||
let (mut votes, _) = new_state.into_old_state();
|
||||
|
||||
for (index, sig) in approval_votes.into_iter() {
|
||||
debug_assert!(
|
||||
{
|
||||
let pub_key = &env.session_info().validators[index.0 as usize];
|
||||
let candidate_hash = votes.candidate_receipt.hash();
|
||||
let session_index = env.session_index();
|
||||
DisputeStatement::Valid(ValidDisputeStatementKind::ApprovalChecking)
|
||||
.check_signature(pub_key, candidate_hash, session_index, &sig)
|
||||
.is_ok()
|
||||
},
|
||||
"Signature check for imported approval votes failed! This is a serious bug. Session: {:?}, candidate hash: {:?}, validator index: {:?}", env.session_index(), votes.candidate_receipt.hash(), index
|
||||
);
|
||||
if insert_into_statements(
|
||||
&mut votes.valid,
|
||||
ValidDisputeStatementKind::ApprovalChecking,
|
||||
index,
|
||||
sig,
|
||||
) {
|
||||
imported_valid_votes += 1;
|
||||
imported_approval_votes += 1;
|
||||
}
|
||||
}
|
||||
|
||||
let new_state = CandidateVoteState::new(votes, env);
|
||||
|
||||
Self {
|
||||
old_state,
|
||||
new_state,
|
||||
new_invalid_voters,
|
||||
imported_valid_votes,
|
||||
imported_invalid_votes,
|
||||
imported_approval_votes,
|
||||
}
|
||||
}
|
||||
|
||||
/// All done, give me those votes.
|
||||
///
|
||||
/// Returns: `None` in case nothing has changed (import was redundant).
|
||||
pub fn into_updated_votes(self) -> Option<CandidateVotes> {
|
||||
if self.votes_changed() {
|
||||
let CandidateVoteState { votes, .. } = self.new_state;
|
||||
Some(votes)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Find indices controlled by this validator.
|
||||
///
|
||||
/// That is all `ValidatorIndex`es we have private keys for. Usually this will only be one.
|
||||
fn find_controlled_validator_indices(
|
||||
keystore: &LocalKeystore,
|
||||
validators: &[ValidatorId],
|
||||
) -> HashSet<ValidatorIndex> {
|
||||
let mut controlled = HashSet::new();
|
||||
for (index, validator) in validators.iter().enumerate() {
|
||||
if keystore.key_pair::<ValidatorPair>(validator).ok().flatten().is_none() {
|
||||
continue
|
||||
}
|
||||
|
||||
controlled.insert(ValidatorIndex(index as _));
|
||||
}
|
||||
|
||||
controlled
|
||||
}
|
||||
|
||||
// Returns 'true' if no other vote by that validator was already
|
||||
// present and 'false' otherwise. Same semantics as `HashSet`.
|
||||
fn insert_into_statements<T>(
|
||||
m: &mut BTreeMap<ValidatorIndex, (T, ValidatorSignature)>,
|
||||
tag: T,
|
||||
val_index: ValidatorIndex,
|
||||
val_signature: ValidatorSignature,
|
||||
) -> bool {
|
||||
m.insert(val_index, (tag, val_signature)).is_none()
|
||||
}
|
||||
@@ -16,12 +16,12 @@
|
||||
|
||||
//! Dispute coordinator subsystem in initialized state (after first active leaf is received).
|
||||
|
||||
use std::{
|
||||
collections::{BTreeMap, HashSet},
|
||||
sync::Arc,
|
||||
};
|
||||
use std::{collections::BTreeMap, sync::Arc};
|
||||
|
||||
use futures::{channel::mpsc, FutureExt, StreamExt};
|
||||
use futures::{
|
||||
channel::{mpsc, oneshot},
|
||||
FutureExt, StreamExt,
|
||||
};
|
||||
|
||||
use sc_keystore::LocalKeystore;
|
||||
|
||||
@@ -31,8 +31,8 @@ use polkadot_node_primitives::{
|
||||
};
|
||||
use polkadot_node_subsystem::{
|
||||
messages::{
|
||||
BlockDescription, DisputeCoordinatorMessage, DisputeDistributionMessage,
|
||||
ImportStatementsResult,
|
||||
ApprovalVotingMessage, BlockDescription, DisputeCoordinatorMessage,
|
||||
DisputeDistributionMessage, ImportStatementsResult,
|
||||
},
|
||||
overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, OverseerSignal,
|
||||
};
|
||||
@@ -40,13 +40,14 @@ use polkadot_node_subsystem_util::rolling_session_window::{
|
||||
RollingSessionWindow, SessionWindowUpdate, SessionsUnavailable,
|
||||
};
|
||||
use polkadot_primitives::v2::{
|
||||
byzantine_threshold, BlockNumber, CandidateHash, CandidateReceipt, CompactStatement,
|
||||
DisputeStatement, DisputeStatementSet, Hash, ScrapedOnChainVotes, SessionIndex, SessionInfo,
|
||||
ValidDisputeStatementKind, ValidatorId, ValidatorIndex, ValidatorPair, ValidatorSignature,
|
||||
BlockNumber, CandidateHash, CandidateReceipt, CompactStatement, DisputeStatement,
|
||||
DisputeStatementSet, Hash, ScrapedOnChainVotes, SessionIndex, SessionInfo,
|
||||
ValidDisputeStatementKind, ValidatorId, ValidatorIndex,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
error::{log_error, Error, FatalError, FatalResult, JfyiError, JfyiResult, Result},
|
||||
import::{CandidateEnvironment, CandidateVoteState},
|
||||
metrics::Metrics,
|
||||
status::{get_active_with_status, Clock, DisputeStatus, Timestamp},
|
||||
DisputeCoordinatorSubsystem, LOG_TARGET,
|
||||
@@ -194,11 +195,13 @@ impl Initialized {
|
||||
}
|
||||
|
||||
loop {
|
||||
gum::trace!(target: LOG_TARGET, "Waiting for message");
|
||||
let mut overlay_db = OverlayedBackend::new(backend);
|
||||
let default_confirm = Box::new(|| Ok(()));
|
||||
let confirm_write =
|
||||
match MuxedMessage::receive(ctx, &mut self.participation_receiver).await? {
|
||||
MuxedMessage::Participation(msg) => {
|
||||
gum::trace!(target: LOG_TARGET, "MuxedMessage::Participation");
|
||||
let ParticipationStatement {
|
||||
session,
|
||||
candidate_hash,
|
||||
@@ -206,6 +209,13 @@ impl Initialized {
|
||||
outcome,
|
||||
} = self.participation.get_participation_result(ctx, msg).await?;
|
||||
if let Some(valid) = outcome.validity() {
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
?session,
|
||||
?candidate_hash,
|
||||
?valid,
|
||||
"Issuing local statement based on participation outcome."
|
||||
);
|
||||
self.issue_local_statement(
|
||||
ctx,
|
||||
&mut overlay_db,
|
||||
@@ -222,6 +232,7 @@ impl Initialized {
|
||||
MuxedMessage::Subsystem(msg) => match msg {
|
||||
FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
|
||||
FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
|
||||
gum::trace!(target: LOG_TARGET, "OverseerSignal::ActiveLeaves");
|
||||
self.process_active_leaves_update(
|
||||
ctx,
|
||||
&mut overlay_db,
|
||||
@@ -232,6 +243,7 @@ impl Initialized {
|
||||
default_confirm
|
||||
},
|
||||
FromOrchestra::Signal(OverseerSignal::BlockFinalized(_, n)) => {
|
||||
gum::trace!(target: LOG_TARGET, "OverseerSignal::BlockFinalized");
|
||||
self.scraper.process_finalized_block(&n);
|
||||
default_confirm
|
||||
},
|
||||
@@ -349,6 +361,12 @@ impl Initialized {
|
||||
for (candidate_receipt, backers) in backing_validators_per_candidate {
|
||||
let relay_parent = candidate_receipt.descriptor.relay_parent;
|
||||
let candidate_hash = candidate_receipt.hash();
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
?candidate_hash,
|
||||
?relay_parent,
|
||||
"Importing backing votes from chain for candidate"
|
||||
);
|
||||
let statements = backers
|
||||
.into_iter()
|
||||
.filter_map(|(validator_index, attestation)| {
|
||||
@@ -373,6 +391,19 @@ impl Initialized {
|
||||
CompactStatement::Valid(_) =>
|
||||
ValidDisputeStatementKind::BackingValid(relay_parent),
|
||||
};
|
||||
debug_assert!(
|
||||
SignedDisputeStatement::new_checked(
|
||||
DisputeStatement::Valid(valid_statement_kind),
|
||||
candidate_hash,
|
||||
session,
|
||||
validator_public.clone(),
|
||||
validator_signature.clone(),
|
||||
).is_ok(),
|
||||
"Scraped backing votes had invalid signature! candidate: {:?}, session: {:?}, validator_public: {:?}",
|
||||
candidate_hash,
|
||||
session,
|
||||
validator_public,
|
||||
);
|
||||
let signed_dispute_statement =
|
||||
SignedDisputeStatement::new_unchecked_from_trusted_source(
|
||||
DisputeStatement::Valid(valid_statement_kind),
|
||||
@@ -389,7 +420,6 @@ impl Initialized {
|
||||
.handle_import_statements(
|
||||
ctx,
|
||||
overlay_db,
|
||||
candidate_hash,
|
||||
MaybeCandidateReceipt::Provides(candidate_receipt),
|
||||
session,
|
||||
statements,
|
||||
@@ -412,13 +442,19 @@ impl Initialized {
|
||||
}
|
||||
}
|
||||
|
||||
// Import concluded disputes from on-chain, this already went through a vote so it's assumed
|
||||
// Import disputes from on-chain, this already went through a vote so it's assumed
|
||||
// as verified. This will only be stored, gossiping it is not necessary.
|
||||
|
||||
// First try to obtain all the backings which ultimately contain the candidate
|
||||
// receipt which we need.
|
||||
|
||||
for DisputeStatementSet { candidate_hash, session, statements } in disputes {
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
?candidate_hash,
|
||||
?session,
|
||||
"Importing dispute votes from chain for candidate"
|
||||
);
|
||||
let statements = statements
|
||||
.into_iter()
|
||||
.filter_map(|(dispute_statement, validator_index, validator_signature)| {
|
||||
@@ -449,6 +485,21 @@ impl Initialized {
|
||||
})
|
||||
.cloned()?;
|
||||
|
||||
debug_assert!(
|
||||
SignedDisputeStatement::new_checked(
|
||||
dispute_statement.clone(),
|
||||
candidate_hash,
|
||||
session,
|
||||
validator_public.clone(),
|
||||
validator_signature.clone(),
|
||||
).is_ok(),
|
||||
"Scraped dispute votes had invalid signature! candidate: {:?}, session: {:?}, dispute_statement: {:?}, validator_public: {:?}",
|
||||
candidate_hash,
|
||||
session,
|
||||
dispute_statement,
|
||||
validator_public,
|
||||
);
|
||||
|
||||
Some((
|
||||
SignedDisputeStatement::new_unchecked_from_trusted_source(
|
||||
dispute_statement,
|
||||
@@ -465,9 +516,8 @@ impl Initialized {
|
||||
.handle_import_statements(
|
||||
ctx,
|
||||
overlay_db,
|
||||
candidate_hash,
|
||||
// TODO <https://github.com/paritytech/polkadot/issues/4011>
|
||||
MaybeCandidateReceipt::AssumeBackingVotePresent,
|
||||
MaybeCandidateReceipt::AssumeBackingVotePresent(candidate_hash),
|
||||
session,
|
||||
statements,
|
||||
now,
|
||||
@@ -478,13 +528,13 @@ impl Initialized {
|
||||
target: LOG_TARGET,
|
||||
?candidate_hash,
|
||||
?session,
|
||||
"Imported statement of concluded dispute from on-chain"
|
||||
"Imported statement of dispute from on-chain"
|
||||
),
|
||||
ImportStatementsResult::InvalidImport => gum::warn!(
|
||||
target: LOG_TARGET,
|
||||
?candidate_hash,
|
||||
?session,
|
||||
"Attempted import of on-chain statement of concluded dispute failed"
|
||||
"Attempted import of on-chain statement of dispute failed"
|
||||
),
|
||||
}
|
||||
}
|
||||
@@ -501,17 +551,21 @@ impl Initialized {
|
||||
) -> Result<Box<dyn FnOnce() -> JfyiResult<()>>> {
|
||||
match message {
|
||||
DisputeCoordinatorMessage::ImportStatements {
|
||||
candidate_hash,
|
||||
candidate_receipt,
|
||||
session,
|
||||
statements,
|
||||
pending_confirmation,
|
||||
} => {
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
candidate_hash = ?candidate_receipt.hash(),
|
||||
?session,
|
||||
"DisputeCoordinatorMessage::ImportStatements"
|
||||
);
|
||||
let outcome = self
|
||||
.handle_import_statements(
|
||||
ctx,
|
||||
overlay_db,
|
||||
candidate_hash,
|
||||
MaybeCandidateReceipt::Provides(candidate_receipt),
|
||||
session,
|
||||
statements,
|
||||
@@ -537,11 +591,13 @@ impl Initialized {
|
||||
// Return error if session information is missing.
|
||||
self.ensure_available_session_info()?;
|
||||
|
||||
gum::trace!(target: LOG_TARGET, "Loading recent disputes from db");
|
||||
let recent_disputes = if let Some(disputes) = overlay_db.load_recent_disputes()? {
|
||||
disputes
|
||||
} else {
|
||||
BTreeMap::new()
|
||||
};
|
||||
gum::trace!(target: LOG_TARGET, "Loaded recent disputes from db");
|
||||
|
||||
let _ = tx.send(recent_disputes.keys().cloned().collect());
|
||||
},
|
||||
@@ -549,6 +605,8 @@ impl Initialized {
|
||||
// Return error if session information is missing.
|
||||
self.ensure_available_session_info()?;
|
||||
|
||||
gum::trace!(target: LOG_TARGET, "DisputeCoordinatorMessage::ActiveDisputes");
|
||||
|
||||
let recent_disputes = if let Some(disputes) = overlay_db.load_recent_disputes()? {
|
||||
disputes
|
||||
} else {
|
||||
@@ -565,6 +623,8 @@ impl Initialized {
|
||||
// Return error if session information is missing.
|
||||
self.ensure_available_session_info()?;
|
||||
|
||||
gum::trace!(target: LOG_TARGET, "DisputeCoordinatorMessage::QueryCandidateVotes");
|
||||
|
||||
let mut query_output = Vec::new();
|
||||
for (session_index, candidate_hash) in query {
|
||||
if let Some(v) =
|
||||
@@ -587,6 +647,7 @@ impl Initialized {
|
||||
candidate_receipt,
|
||||
valid,
|
||||
) => {
|
||||
gum::trace!(target: LOG_TARGET, "DisputeCoordinatorMessage::IssueLocalStatement");
|
||||
self.issue_local_statement(
|
||||
ctx,
|
||||
overlay_db,
|
||||
@@ -605,6 +666,10 @@ impl Initialized {
|
||||
} => {
|
||||
// Return error if session information is missing.
|
||||
self.ensure_available_session_info()?;
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
"DisputeCoordinatorMessage::DetermineUndisputedChain"
|
||||
);
|
||||
|
||||
let undisputed_chain = determine_undisputed_chain(
|
||||
overlay_db,
|
||||
@@ -633,200 +698,185 @@ impl Initialized {
|
||||
&mut self,
|
||||
ctx: &mut Context,
|
||||
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
|
||||
candidate_hash: CandidateHash,
|
||||
candidate_receipt: MaybeCandidateReceipt,
|
||||
session: SessionIndex,
|
||||
statements: Vec<(SignedDisputeStatement, ValidatorIndex)>,
|
||||
now: Timestamp,
|
||||
) -> Result<ImportStatementsResult> {
|
||||
gum::trace!(target: LOG_TARGET, ?statements, "In handle import statements");
|
||||
if session + DISPUTE_WINDOW.get() < self.highest_session {
|
||||
// It is not valid to participate in an ancient dispute (spam?).
|
||||
return Ok(ImportStatementsResult::InvalidImport)
|
||||
}
|
||||
|
||||
let session_info = match self.rolling_session_window.session_info(session) {
|
||||
None => {
|
||||
gum::warn!(
|
||||
target: LOG_TARGET,
|
||||
session,
|
||||
"Importing statement lacks info for session which has an active dispute",
|
||||
);
|
||||
let env =
|
||||
match CandidateEnvironment::new(&*self.keystore, &self.rolling_session_window, session)
|
||||
{
|
||||
None => {
|
||||
gum::warn!(
|
||||
target: LOG_TARGET,
|
||||
session,
|
||||
"We are lacking a `SessionInfo` for handling import of statements."
|
||||
);
|
||||
|
||||
return Ok(ImportStatementsResult::InvalidImport)
|
||||
},
|
||||
Some(info) => info,
|
||||
};
|
||||
let validators = session_info.validators.clone();
|
||||
return Ok(ImportStatementsResult::InvalidImport)
|
||||
},
|
||||
Some(env) => env,
|
||||
};
|
||||
|
||||
let n_validators = validators.len();
|
||||
let candidate_hash = candidate_receipt.hash();
|
||||
|
||||
let supermajority_threshold =
|
||||
polkadot_primitives::v2::supermajority_threshold(n_validators);
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
?candidate_hash,
|
||||
?session,
|
||||
num_validators = ?env.session_info().validators.len(),
|
||||
"Number of validators"
|
||||
);
|
||||
|
||||
// In case we are not provided with a candidate receipt
|
||||
// we operate under the assumption, that a previous vote
|
||||
// which included a `CandidateReceipt` was seen.
|
||||
// This holds since every block is preceeded by the `Backing`-phase.
|
||||
// This holds since every block is preceded by the `Backing`-phase.
|
||||
//
|
||||
// There is one exception: A sufficiently sophisticated attacker could prevent
|
||||
// us from seeing the backing votes by witholding arbitrary blocks, and hence we do
|
||||
// us from seeing the backing votes by withholding arbitrary blocks, and hence we do
|
||||
// not have a `CandidateReceipt` available.
|
||||
let (mut votes, mut votes_changed) = match overlay_db
|
||||
let old_state = match overlay_db
|
||||
.load_candidate_votes(session, &candidate_hash)?
|
||||
.map(CandidateVotes::from)
|
||||
{
|
||||
Some(votes) => (votes, false),
|
||||
Some(votes) => CandidateVoteState::new(votes, &env),
|
||||
None =>
|
||||
if let MaybeCandidateReceipt::Provides(candidate_receipt) = candidate_receipt {
|
||||
(
|
||||
CandidateVotes {
|
||||
candidate_receipt,
|
||||
valid: Vec::new(),
|
||||
invalid: Vec::new(),
|
||||
},
|
||||
true,
|
||||
)
|
||||
CandidateVoteState::new_from_receipt(candidate_receipt)
|
||||
} else {
|
||||
gum::warn!(
|
||||
target: LOG_TARGET,
|
||||
session,
|
||||
"Not seen backing vote for candidate which has an active dispute",
|
||||
?candidate_hash,
|
||||
"Cannot import votes, without `CandidateReceipt` available!"
|
||||
);
|
||||
return Ok(ImportStatementsResult::InvalidImport)
|
||||
},
|
||||
};
|
||||
let candidate_receipt = votes.candidate_receipt.clone();
|
||||
let was_concluded_valid = votes.valid.len() >= supermajority_threshold;
|
||||
let was_concluded_invalid = votes.invalid.len() >= supermajority_threshold;
|
||||
|
||||
let mut recent_disputes = overlay_db.load_recent_disputes()?.unwrap_or_default();
|
||||
let controlled_indices = find_controlled_validator_indices(&self.keystore, &validators);
|
||||
gum::trace!(target: LOG_TARGET, ?candidate_hash, ?session, "Loaded votes");
|
||||
|
||||
// Whether we already cast a vote in that dispute:
|
||||
let voted_already = {
|
||||
let mut our_votes = votes.voted_indices();
|
||||
our_votes.retain(|index| controlled_indices.contains(index));
|
||||
!our_votes.is_empty()
|
||||
let import_result = {
|
||||
let intermediate_result = old_state.import_statements(&env, statements);
|
||||
|
||||
// Handle approval vote import:
|
||||
//
|
||||
// See guide: We import on fresh disputes to maximize likelihood of fetching votes for
|
||||
// dead forks and once concluded to maximize time for approval votes to trickle in.
|
||||
if intermediate_result.is_freshly_disputed() ||
|
||||
intermediate_result.is_freshly_concluded()
|
||||
{
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
?candidate_hash,
|
||||
?session,
|
||||
"Requesting approval signatures"
|
||||
);
|
||||
let (tx, rx) = oneshot::channel();
|
||||
// Use of unbounded channels justified because:
|
||||
// 1. Only triggered twice per dispute.
|
||||
// 2. Raising a dispute is costly (requires validation + recovery) by honest nodes,
|
||||
// dishonest nodes are limited by spam slots.
|
||||
// 3. Concluding a dispute is even more costly.
|
||||
// Therefore it is reasonable to expect a simple vote request to succeed way faster
|
||||
// than disputes are raised.
|
||||
// 4. We are waiting (and blocking the whole subsystem) on a response right after -
|
||||
// therefore even with all else failing we will never have more than
|
||||
// one message in flight at any given time.
|
||||
ctx.send_unbounded_message(
|
||||
ApprovalVotingMessage::GetApprovalSignaturesForCandidate(candidate_hash, tx),
|
||||
);
|
||||
match rx.await {
|
||||
Err(_) => {
|
||||
gum::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Fetch for approval votes got cancelled, only expected during shutdown!"
|
||||
);
|
||||
intermediate_result
|
||||
},
|
||||
Ok(votes) => intermediate_result.import_approval_votes(&env, votes),
|
||||
}
|
||||
} else {
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
?candidate_hash,
|
||||
?session,
|
||||
"Not requested approval signatures"
|
||||
);
|
||||
intermediate_result
|
||||
}
|
||||
};
|
||||
|
||||
let was_confirmed = recent_disputes
|
||||
.get(&(session, candidate_hash))
|
||||
.map_or(false, |s| s.is_confirmed_concluded());
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
?candidate_hash,
|
||||
?session,
|
||||
num_validators = ?env.session_info().validators.len(),
|
||||
"Import result ready"
|
||||
);
|
||||
let new_state = import_result.new_state();
|
||||
|
||||
let is_included = self.scraper.is_candidate_included(&candidate_receipt.hash());
|
||||
let is_included = self.scraper.is_candidate_included(&candidate_hash);
|
||||
|
||||
let is_local = statements
|
||||
.iter()
|
||||
.find(|(_, index)| controlled_indices.contains(index))
|
||||
.is_some();
|
||||
let potential_spam = !is_included && !new_state.is_confirmed() && !new_state.has_own_vote();
|
||||
|
||||
// Indexes of the validators issued 'invalid' statements. Will be used to populate spam slots.
|
||||
let mut fresh_invalid_statement_issuers = Vec::new();
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
has_own_vote = ?new_state.has_own_vote(),
|
||||
?potential_spam,
|
||||
?is_included,
|
||||
?candidate_hash,
|
||||
confirmed = ?new_state.is_confirmed(),
|
||||
has_invalid_voters = ?!import_result.new_invalid_voters().is_empty(),
|
||||
"Is spam?"
|
||||
);
|
||||
|
||||
// Update candidate votes.
|
||||
for (statement, val_index) in &statements {
|
||||
if validators
|
||||
.get(val_index.0 as usize)
|
||||
.map_or(true, |v| v != statement.validator_public())
|
||||
{
|
||||
gum::debug!(
|
||||
target: LOG_TARGET,
|
||||
?val_index,
|
||||
session,
|
||||
claimed_key = ?statement.validator_public(),
|
||||
"Validator index doesn't match claimed key",
|
||||
);
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
match statement.statement() {
|
||||
DisputeStatement::Valid(valid_kind) => {
|
||||
let fresh = insert_into_statement_vec(
|
||||
&mut votes.valid,
|
||||
*valid_kind,
|
||||
*val_index,
|
||||
statement.validator_signature().clone(),
|
||||
);
|
||||
|
||||
if !fresh {
|
||||
continue
|
||||
}
|
||||
|
||||
votes_changed = true;
|
||||
self.metrics.on_valid_vote();
|
||||
},
|
||||
DisputeStatement::Invalid(invalid_kind) => {
|
||||
let fresh = insert_into_statement_vec(
|
||||
&mut votes.invalid,
|
||||
*invalid_kind,
|
||||
*val_index,
|
||||
statement.validator_signature().clone(),
|
||||
);
|
||||
|
||||
if !fresh {
|
||||
continue
|
||||
}
|
||||
|
||||
fresh_invalid_statement_issuers.push(*val_index);
|
||||
votes_changed = true;
|
||||
self.metrics.on_invalid_vote();
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Whether or not we know already that this is a good dispute:
|
||||
//
|
||||
// Note we can only know for sure whether we reached the `byzantine_threshold` after
|
||||
// updating candidate votes above, therefore the spam checking is afterwards:
|
||||
let is_confirmed = is_included ||
|
||||
was_confirmed ||
|
||||
is_local || votes.voted_indices().len() >
|
||||
byzantine_threshold(n_validators);
|
||||
if !potential_spam {
|
||||
// Former spammers have not been spammers after all:
|
||||
self.spam_slots.clear(&(session, candidate_hash));
|
||||
|
||||
// Potential spam:
|
||||
if !is_confirmed && !fresh_invalid_statement_issuers.is_empty() {
|
||||
let mut free_spam_slots_available = true;
|
||||
// Only allow import if all validators voting invalid, have not exceeded
|
||||
// their spam slots:
|
||||
for index in fresh_invalid_statement_issuers {
|
||||
} else if !import_result.new_invalid_voters().is_empty() {
|
||||
let mut free_spam_slots_available = false;
|
||||
// Only allow import if at least one validator voting invalid, has not exceeded
|
||||
// its spam slots:
|
||||
for index in import_result.new_invalid_voters() {
|
||||
// Disputes can only be triggered via an invalidity stating vote, thus we only
|
||||
// need to increase spam slots on invalid votes. (If we did not, we would also
|
||||
// increase spam slots for backing validators for example - as validators have to
|
||||
// provide some opposing vote for dispute-distribution).
|
||||
free_spam_slots_available &=
|
||||
self.spam_slots.add_unconfirmed(session, candidate_hash, index);
|
||||
free_spam_slots_available |=
|
||||
self.spam_slots.add_unconfirmed(session, candidate_hash, *index);
|
||||
}
|
||||
// Only validity stating votes or validator had free spam slot?
|
||||
if !free_spam_slots_available {
|
||||
gum::debug!(
|
||||
target: LOG_TARGET,
|
||||
?candidate_hash,
|
||||
?session,
|
||||
?statements,
|
||||
invalid_voters = ?import_result.new_invalid_voters(),
|
||||
"Rejecting import because of full spam slots."
|
||||
);
|
||||
return Ok(ImportStatementsResult::InvalidImport)
|
||||
}
|
||||
}
|
||||
|
||||
if is_confirmed && !was_confirmed {
|
||||
// Former spammers have not been spammers after all:
|
||||
self.spam_slots.clear(&(session, candidate_hash));
|
||||
}
|
||||
|
||||
// Check if newly disputed.
|
||||
let is_disputed = !votes.valid.is_empty() && !votes.invalid.is_empty();
|
||||
let concluded_valid = votes.valid.len() >= supermajority_threshold;
|
||||
let concluded_invalid = votes.invalid.len() >= supermajority_threshold;
|
||||
|
||||
// Participate in dispute if the imported vote was not local, we did not vote before either
|
||||
// and we actually have keys to issue a local vote.
|
||||
if !is_local && !voted_already && is_disputed && !controlled_indices.is_empty() {
|
||||
// Participate in dispute if we did not cast a vote before and actually have keys to cast a
|
||||
// local vote:
|
||||
if !new_state.has_own_vote() &&
|
||||
new_state.is_disputed() &&
|
||||
!env.controlled_indices().is_empty()
|
||||
{
|
||||
let priority = ParticipationPriority::with_priority_if(is_included);
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
candidate_hash = ?candidate_receipt.hash(),
|
||||
?candidate_hash,
|
||||
?priority,
|
||||
"Queuing participation for candidate"
|
||||
);
|
||||
@@ -835,22 +885,77 @@ impl Initialized {
|
||||
} else {
|
||||
self.metrics.on_queued_best_effort_participation();
|
||||
}
|
||||
// Participate whenever the imported vote was local & we did not had no cast
|
||||
// previously:
|
||||
let r = self
|
||||
.participation
|
||||
.queue_participation(
|
||||
ctx,
|
||||
priority,
|
||||
ParticipationRequest::new(candidate_receipt, session, n_validators),
|
||||
ParticipationRequest::new(
|
||||
new_state.candidate_receipt().clone(),
|
||||
session,
|
||||
env.validators().len(),
|
||||
),
|
||||
)
|
||||
.await;
|
||||
log_error(r)?;
|
||||
}
|
||||
|
||||
let prev_status = recent_disputes.get(&(session, candidate_hash)).map(|x| x.clone());
|
||||
// Also send any already existing approval vote on new disputes:
|
||||
if import_result.is_freshly_disputed() {
|
||||
let no_votes = Vec::new();
|
||||
let our_approval_votes = new_state.own_approval_votes().unwrap_or(&no_votes);
|
||||
for (validator_index, sig) in our_approval_votes {
|
||||
let pub_key = match env.validators().get(validator_index.0 as usize) {
|
||||
None => {
|
||||
gum::error!(
|
||||
target: LOG_TARGET,
|
||||
?validator_index,
|
||||
?session,
|
||||
"Could not find pub key in `SessionInfo` for our own approval vote!"
|
||||
);
|
||||
continue
|
||||
},
|
||||
Some(k) => k,
|
||||
};
|
||||
let statement = SignedDisputeStatement::new_unchecked_from_trusted_source(
|
||||
DisputeStatement::Valid(ValidDisputeStatementKind::ApprovalChecking),
|
||||
candidate_hash,
|
||||
session,
|
||||
pub_key.clone(),
|
||||
sig.clone(),
|
||||
);
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
?candidate_hash,
|
||||
?session,
|
||||
?validator_index,
|
||||
"Sending out own approval vote"
|
||||
);
|
||||
match make_dispute_message(
|
||||
env.session_info(),
|
||||
&new_state.votes(),
|
||||
statement,
|
||||
*validator_index,
|
||||
) {
|
||||
Err(err) => {
|
||||
gum::error!(
|
||||
target: LOG_TARGET,
|
||||
?err,
|
||||
"No ongoing dispute, but we checked there is one!"
|
||||
);
|
||||
},
|
||||
Ok(dispute_message) => {
|
||||
ctx.send_message(DisputeDistributionMessage::SendDispute(dispute_message))
|
||||
.await;
|
||||
},
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// All good, update recent disputes if state has changed:
|
||||
if import_result.dispute_state_changed() {
|
||||
let mut recent_disputes = overlay_db.load_recent_disputes()?.unwrap_or_default();
|
||||
|
||||
let status = if is_disputed {
|
||||
let status = recent_disputes.entry((session, candidate_hash)).or_insert_with(|| {
|
||||
gum::info!(
|
||||
target: LOG_TARGET,
|
||||
@@ -861,57 +966,73 @@ impl Initialized {
|
||||
DisputeStatus::active()
|
||||
});
|
||||
|
||||
if is_confirmed {
|
||||
if new_state.is_confirmed() {
|
||||
*status = status.confirm();
|
||||
}
|
||||
|
||||
// Note: concluded-invalid overwrites concluded-valid,
|
||||
// so we do this check first. Dispute state machine is
|
||||
// non-commutative.
|
||||
if concluded_valid {
|
||||
if new_state.is_concluded_valid() {
|
||||
*status = status.concluded_for(now);
|
||||
}
|
||||
|
||||
if concluded_invalid {
|
||||
if new_state.is_concluded_invalid() {
|
||||
*status = status.concluded_against(now);
|
||||
}
|
||||
|
||||
Some(*status)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
if status != prev_status {
|
||||
if prev_status.is_none() {
|
||||
self.metrics.on_open();
|
||||
}
|
||||
|
||||
if !was_concluded_valid && concluded_valid {
|
||||
gum::info!(
|
||||
target: LOG_TARGET,
|
||||
?candidate_hash,
|
||||
session,
|
||||
"Dispute on candidate concluded with 'valid' result",
|
||||
);
|
||||
self.metrics.on_concluded_valid();
|
||||
}
|
||||
|
||||
if !was_concluded_invalid && concluded_invalid {
|
||||
gum::info!(
|
||||
target: LOG_TARGET,
|
||||
?candidate_hash,
|
||||
session,
|
||||
"Dispute on candidate concluded with 'invalid' result",
|
||||
);
|
||||
self.metrics.on_concluded_invalid();
|
||||
}
|
||||
|
||||
// Only write when updated:
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
?candidate_hash,
|
||||
?status,
|
||||
is_concluded_valid = ?new_state.is_concluded_valid(),
|
||||
is_concluded_invalid = ?new_state.is_concluded_invalid(),
|
||||
"Writing recent disputes with updates for candidate"
|
||||
);
|
||||
overlay_db.write_recent_disputes(recent_disputes);
|
||||
}
|
||||
|
||||
// Update metrics:
|
||||
if import_result.is_freshly_disputed() {
|
||||
self.metrics.on_open();
|
||||
}
|
||||
self.metrics.on_valid_votes(import_result.imported_valid_votes());
|
||||
self.metrics.on_invalid_votes(import_result.imported_invalid_votes());
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
?candidate_hash,
|
||||
?session,
|
||||
imported_approval_votes = ?import_result.imported_approval_votes(),
|
||||
imported_valid_votes = ?import_result.imported_valid_votes(),
|
||||
imported_invalid_votes = ?import_result.imported_invalid_votes(),
|
||||
total_valid_votes = ?import_result.new_state().votes().valid.len(),
|
||||
total_invalid_votes = ?import_result.new_state().votes().invalid.len(),
|
||||
confirmed = ?import_result.new_state().is_confirmed(),
|
||||
"Import summary"
|
||||
);
|
||||
|
||||
self.metrics.on_approval_votes(import_result.imported_approval_votes());
|
||||
if import_result.is_freshly_concluded_valid() {
|
||||
gum::info!(
|
||||
target: LOG_TARGET,
|
||||
?candidate_hash,
|
||||
session,
|
||||
"Dispute on candidate concluded with 'valid' result",
|
||||
);
|
||||
self.metrics.on_concluded_valid();
|
||||
}
|
||||
if import_result.is_freshly_concluded_invalid() {
|
||||
gum::info!(
|
||||
target: LOG_TARGET,
|
||||
?candidate_hash,
|
||||
session,
|
||||
"Dispute on candidate concluded with 'invalid' result",
|
||||
);
|
||||
self.metrics.on_concluded_invalid();
|
||||
}
|
||||
|
||||
// Only write when votes have changed.
|
||||
if votes_changed {
|
||||
if let Some(votes) = import_result.into_updated_votes() {
|
||||
overlay_db.write_candidate_votes(session, candidate_hash, votes.into());
|
||||
}
|
||||
|
||||
@@ -928,29 +1049,37 @@ impl Initialized {
|
||||
valid: bool,
|
||||
now: Timestamp,
|
||||
) -> Result<()> {
|
||||
// Load session info.
|
||||
let info = match self.rolling_session_window.session_info(session) {
|
||||
None => {
|
||||
gum::warn!(
|
||||
target: LOG_TARGET,
|
||||
session,
|
||||
"Missing info for session which has an active dispute",
|
||||
);
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
?candidate_hash,
|
||||
?session,
|
||||
?valid,
|
||||
?now,
|
||||
"Issuing local statement for candidate!"
|
||||
);
|
||||
// Load environment:
|
||||
let env =
|
||||
match CandidateEnvironment::new(&*self.keystore, &self.rolling_session_window, session)
|
||||
{
|
||||
None => {
|
||||
gum::warn!(
|
||||
target: LOG_TARGET,
|
||||
session,
|
||||
"Missing info for session which has an active dispute",
|
||||
);
|
||||
|
||||
return Ok(())
|
||||
},
|
||||
Some(info) => info,
|
||||
};
|
||||
|
||||
let validators = info.validators.clone();
|
||||
return Ok(())
|
||||
},
|
||||
Some(env) => env,
|
||||
};
|
||||
|
||||
let votes = overlay_db
|
||||
.load_candidate_votes(session, &candidate_hash)?
|
||||
.map(CandidateVotes::from)
|
||||
.unwrap_or_else(|| CandidateVotes {
|
||||
candidate_receipt: candidate_receipt.clone(),
|
||||
valid: Vec::new(),
|
||||
invalid: Vec::new(),
|
||||
valid: BTreeMap::new(),
|
||||
invalid: BTreeMap::new(),
|
||||
});
|
||||
|
||||
// Sign a statement for each validator index we control which has
|
||||
@@ -958,8 +1087,7 @@ impl Initialized {
|
||||
let voted_indices = votes.voted_indices();
|
||||
let mut statements = Vec::new();
|
||||
|
||||
let voted_indices: HashSet<_> = voted_indices.into_iter().collect();
|
||||
let controlled_indices = find_controlled_validator_indices(&self.keystore, &validators[..]);
|
||||
let controlled_indices = env.controlled_indices();
|
||||
for index in controlled_indices {
|
||||
if voted_indices.contains(&index) {
|
||||
continue
|
||||
@@ -971,13 +1099,13 @@ impl Initialized {
|
||||
valid,
|
||||
candidate_hash,
|
||||
session,
|
||||
validators[index.0 as usize].clone(),
|
||||
env.validators()[index.0 as usize].clone(),
|
||||
)
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Ok(Some(signed_dispute_statement)) => {
|
||||
statements.push((signed_dispute_statement, index));
|
||||
statements.push((signed_dispute_statement, *index));
|
||||
},
|
||||
Ok(None) => {},
|
||||
Err(e) => {
|
||||
@@ -993,7 +1121,7 @@ impl Initialized {
|
||||
// Get our message out:
|
||||
for (statement, index) in &statements {
|
||||
let dispute_message =
|
||||
match make_dispute_message(info, &votes, statement.clone(), *index) {
|
||||
match make_dispute_message(env.session_info(), &votes, statement.clone(), *index) {
|
||||
Err(err) => {
|
||||
gum::debug!(target: LOG_TARGET, ?err, "Creating dispute message failed.");
|
||||
continue
|
||||
@@ -1010,7 +1138,6 @@ impl Initialized {
|
||||
.handle_import_statements(
|
||||
ctx,
|
||||
overlay_db,
|
||||
candidate_hash,
|
||||
MaybeCandidateReceipt::Provides(candidate_receipt),
|
||||
session,
|
||||
statements,
|
||||
@@ -1066,29 +1193,22 @@ impl MuxedMessage {
|
||||
}
|
||||
}
|
||||
|
||||
// Returns 'true' if no other vote by that validator was already
|
||||
// present and 'false' otherwise. Same semantics as `HashSet`.
|
||||
fn insert_into_statement_vec<T>(
|
||||
vec: &mut Vec<(T, ValidatorIndex, ValidatorSignature)>,
|
||||
tag: T,
|
||||
val_index: ValidatorIndex,
|
||||
val_signature: ValidatorSignature,
|
||||
) -> bool {
|
||||
let pos = match vec.binary_search_by_key(&val_index, |x| x.1) {
|
||||
Ok(_) => return false, // no duplicates needed.
|
||||
Err(p) => p,
|
||||
};
|
||||
|
||||
vec.insert(pos, (tag, val_index, val_signature));
|
||||
true
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
enum MaybeCandidateReceipt {
|
||||
/// Directly provides the candiate receipt.
|
||||
/// Directly provides the candidate receipt.
|
||||
Provides(CandidateReceipt),
|
||||
/// Assumes it was seen before by means of seconded message.
|
||||
AssumeBackingVotePresent,
|
||||
AssumeBackingVotePresent(CandidateHash),
|
||||
}
|
||||
|
||||
impl MaybeCandidateReceipt {
|
||||
/// Retrieve `CandidateHash` for the corresponding candidate.
|
||||
pub fn hash(&self) -> CandidateHash {
|
||||
match self {
|
||||
Self::Provides(receipt) => receipt.hash(),
|
||||
Self::AssumeBackingVotePresent(hash) => *hash,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
@@ -1113,35 +1233,35 @@ fn make_dispute_message(
|
||||
|
||||
let (valid_statement, valid_index, invalid_statement, invalid_index) =
|
||||
if let DisputeStatement::Valid(_) = our_vote.statement() {
|
||||
let (statement_kind, validator_index, validator_signature) =
|
||||
votes.invalid.get(0).ok_or(DisputeMessageCreationError::NoOppositeVote)?.clone();
|
||||
let (validator_index, (statement_kind, validator_signature)) =
|
||||
votes.invalid.iter().next().ok_or(DisputeMessageCreationError::NoOppositeVote)?;
|
||||
let other_vote = SignedDisputeStatement::new_checked(
|
||||
DisputeStatement::Invalid(statement_kind),
|
||||
DisputeStatement::Invalid(*statement_kind),
|
||||
our_vote.candidate_hash().clone(),
|
||||
our_vote.session_index(),
|
||||
validators
|
||||
.get(validator_index.0 as usize)
|
||||
.ok_or(DisputeMessageCreationError::InvalidValidatorIndex)?
|
||||
.clone(),
|
||||
validator_signature,
|
||||
validator_signature.clone(),
|
||||
)
|
||||
.map_err(|()| DisputeMessageCreationError::InvalidStoredStatement)?;
|
||||
(our_vote, our_index, other_vote, validator_index)
|
||||
(our_vote, our_index, other_vote, *validator_index)
|
||||
} else {
|
||||
let (statement_kind, validator_index, validator_signature) =
|
||||
votes.valid.get(0).ok_or(DisputeMessageCreationError::NoOppositeVote)?.clone();
|
||||
let (validator_index, (statement_kind, validator_signature)) =
|
||||
votes.valid.iter().next().ok_or(DisputeMessageCreationError::NoOppositeVote)?;
|
||||
let other_vote = SignedDisputeStatement::new_checked(
|
||||
DisputeStatement::Valid(statement_kind),
|
||||
DisputeStatement::Valid(*statement_kind),
|
||||
our_vote.candidate_hash().clone(),
|
||||
our_vote.session_index(),
|
||||
validators
|
||||
.get(validator_index.0 as usize)
|
||||
.ok_or(DisputeMessageCreationError::InvalidValidatorIndex)?
|
||||
.clone(),
|
||||
validator_signature,
|
||||
validator_signature.clone(),
|
||||
)
|
||||
.map_err(|()| DisputeMessageCreationError::InvalidStoredStatement)?;
|
||||
(other_vote, validator_index, our_vote, our_index)
|
||||
(other_vote, *validator_index, our_vote, our_index)
|
||||
};
|
||||
|
||||
DisputeMessage::from_signed_statements(
|
||||
@@ -1155,7 +1275,7 @@ fn make_dispute_message(
|
||||
.map_err(DisputeMessageCreationError::InvalidStatementCombination)
|
||||
}
|
||||
|
||||
/// Determine the the best block and its block number.
|
||||
/// Determine the best block and its block number.
|
||||
/// Assumes `block_descriptions` are sorted from the one
|
||||
/// with the lowest `BlockNumber` to the highest.
|
||||
fn determine_undisputed_chain(
|
||||
@@ -1194,19 +1314,3 @@ fn determine_undisputed_chain(
|
||||
|
||||
Ok(last)
|
||||
}
|
||||
|
||||
fn find_controlled_validator_indices(
|
||||
keystore: &LocalKeystore,
|
||||
validators: &[ValidatorId],
|
||||
) -> HashSet<ValidatorIndex> {
|
||||
let mut controlled = HashSet::new();
|
||||
for (index, validator) in validators.iter().enumerate() {
|
||||
if keystore.key_pair::<ValidatorPair>(validator).ok().flatten().is_none() {
|
||||
continue
|
||||
}
|
||||
|
||||
controlled.insert(ValidatorIndex(index as _));
|
||||
}
|
||||
|
||||
controlled
|
||||
}
|
||||
|
||||
@@ -24,7 +24,7 @@
|
||||
//! validation results as well as a sink for votes received by other subsystems. When importing a dispute vote from
|
||||
//! another node, this will trigger dispute participation to recover and validate the block.
|
||||
|
||||
use std::{collections::HashSet, sync::Arc};
|
||||
use std::sync::Arc;
|
||||
|
||||
use futures::FutureExt;
|
||||
|
||||
@@ -89,6 +89,9 @@ mod spam_slots;
|
||||
/// if there are lots of them.
|
||||
pub(crate) mod participation;
|
||||
|
||||
/// Pure processing of vote imports.
|
||||
pub(crate) mod import;
|
||||
|
||||
/// Metrics types.
|
||||
mod metrics;
|
||||
|
||||
@@ -302,7 +305,7 @@ impl DisputeCoordinatorSubsystem {
|
||||
};
|
||||
|
||||
let n_validators = validators.len();
|
||||
let voted_indices: HashSet<_> = votes.voted_indices().into_iter().collect();
|
||||
let voted_indices = votes.voted_indices();
|
||||
|
||||
// Determine if there are any missing local statements for this dispute. Validators are
|
||||
// filtered if:
|
||||
|
||||
@@ -22,6 +22,8 @@ struct MetricsInner {
|
||||
open: prometheus::Counter<prometheus::U64>,
|
||||
/// Votes of all disputes.
|
||||
votes: prometheus::CounterVec<prometheus::U64>,
|
||||
/// Number of approval votes explicitly fetched from approval voting.
|
||||
approval_votes: prometheus::Counter<prometheus::U64>,
|
||||
/// Conclusion across all disputes.
|
||||
concluded: prometheus::CounterVec<prometheus::U64>,
|
||||
/// Number of participations that have been queued.
|
||||
@@ -41,15 +43,21 @@ impl Metrics {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn on_valid_vote(&self) {
|
||||
pub(crate) fn on_valid_votes(&self, vote_count: u32) {
|
||||
if let Some(metrics) = &self.0 {
|
||||
metrics.votes.with_label_values(&["valid"]).inc();
|
||||
metrics.votes.with_label_values(&["valid"]).inc_by(vote_count as _);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn on_invalid_vote(&self) {
|
||||
pub(crate) fn on_invalid_votes(&self, vote_count: u32) {
|
||||
if let Some(metrics) = &self.0 {
|
||||
metrics.votes.with_label_values(&["invalid"]).inc();
|
||||
metrics.votes.with_label_values(&["invalid"]).inc_by(vote_count as _);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn on_approval_votes(&self, vote_count: u32) {
|
||||
if let Some(metrics) = &self.0 {
|
||||
metrics.approval_votes.inc_by(vote_count as _);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -112,6 +120,13 @@ impl metrics::Metrics for Metrics {
|
||||
)?,
|
||||
registry,
|
||||
)?,
|
||||
approval_votes: prometheus::register(
|
||||
prometheus::Counter::with_opts(prometheus::Opts::new(
|
||||
"polkadot_parachain_dispute_candidate_approval_votes_fetched_total",
|
||||
"Number of approval votes fetched from approval voting.",
|
||||
))?,
|
||||
registry,
|
||||
)?,
|
||||
queued_participations: prometheus::register(
|
||||
prometheus::CounterVec::new(
|
||||
prometheus::Opts::new(
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::collections::{BTreeSet, HashMap};
|
||||
|
||||
use polkadot_primitives::v2::{CandidateHash, SessionIndex, ValidatorIndex};
|
||||
|
||||
@@ -54,7 +54,7 @@ pub struct SpamSlots {
|
||||
}
|
||||
|
||||
/// Unconfirmed disputes to be passed at initialization.
|
||||
pub type UnconfirmedDisputes = HashMap<(SessionIndex, CandidateHash), HashSet<ValidatorIndex>>;
|
||||
pub type UnconfirmedDisputes = HashMap<(SessionIndex, CandidateHash), BTreeSet<ValidatorIndex>>;
|
||||
|
||||
impl SpamSlots {
|
||||
/// Recover `SpamSlots` from state on startup.
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -97,7 +97,7 @@ impl ParachainsInherentDataProvider {
|
||||
Err(err) => {
|
||||
gum::debug!(
|
||||
target: LOG_TARGET,
|
||||
?err,
|
||||
%err,
|
||||
"Could not get provisioner inherent data; injecting default data",
|
||||
);
|
||||
ParachainsInherentData {
|
||||
|
||||
@@ -58,6 +58,9 @@ pub enum Error {
|
||||
#[error("failed to send message to CandidateBacking to get backed candidates")]
|
||||
GetBackedCandidatesSend(#[source] mpsc::SendError),
|
||||
|
||||
#[error("Send inherent data timeout.")]
|
||||
SendInherentDataTimeout,
|
||||
|
||||
#[error("failed to send return message with Inherents")]
|
||||
InherentDataReturnChannel,
|
||||
|
||||
|
||||
@@ -35,7 +35,9 @@ use polkadot_node_subsystem::{
|
||||
overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, LeafStatus, OverseerSignal,
|
||||
PerLeafSpan, SpawnedSubsystem, SubsystemError,
|
||||
};
|
||||
use polkadot_node_subsystem_util::{request_availability_cores, request_persisted_validation_data};
|
||||
use polkadot_node_subsystem_util::{
|
||||
request_availability_cores, request_persisted_validation_data, TimeoutExt,
|
||||
};
|
||||
use polkadot_primitives::v2::{
|
||||
BackedCandidate, BlockNumber, CandidateHash, CandidateReceipt, CoreState, DisputeState,
|
||||
DisputeStatement, DisputeStatementSet, Hash, MultiDisputeStatementSet, OccupiedCoreAssumption,
|
||||
@@ -55,6 +57,8 @@ mod tests;
|
||||
|
||||
/// How long to wait before proposing.
|
||||
const PRE_PROPOSE_TIMEOUT: std::time::Duration = core::time::Duration::from_millis(2000);
|
||||
/// Some timeout to ensure task won't hang around in the background forever on issues.
|
||||
const SEND_INHERENT_DATA_TIMEOUT: std::time::Duration = core::time::Duration::from_millis(500);
|
||||
|
||||
const LOG_TARGET: &str = "parachain::provisioner";
|
||||
|
||||
@@ -153,6 +157,12 @@ async fn run_iteration<Context>(
|
||||
if let Some(state) = per_relay_parent.get_mut(&hash) {
|
||||
state.is_inherent_ready = true;
|
||||
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
relay_parent = ?hash,
|
||||
"Inherent Data became ready"
|
||||
);
|
||||
|
||||
let return_senders = std::mem::take(&mut state.awaiting_inherent);
|
||||
if !return_senders.is_empty() {
|
||||
send_inherent_data_bg(ctx, &state, return_senders, metrics.clone()).await?;
|
||||
@@ -188,11 +198,19 @@ async fn handle_communication<Context>(
|
||||
) -> Result<(), Error> {
|
||||
match message {
|
||||
ProvisionerMessage::RequestInherentData(relay_parent, return_sender) => {
|
||||
gum::trace!(target: LOG_TARGET, ?relay_parent, "Inherent data got requested.");
|
||||
|
||||
if let Some(state) = per_relay_parent.get_mut(&relay_parent) {
|
||||
if state.is_inherent_ready {
|
||||
gum::trace!(target: LOG_TARGET, ?relay_parent, "Calling send_inherent_data.");
|
||||
send_inherent_data_bg(ctx, &state, vec![return_sender], metrics.clone())
|
||||
.await?;
|
||||
} else {
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
?relay_parent,
|
||||
"Queuing inherent data request (inherent data not yet ready)."
|
||||
);
|
||||
state.awaiting_inherent.push(return_sender);
|
||||
}
|
||||
}
|
||||
@@ -202,6 +220,8 @@ async fn handle_communication<Context>(
|
||||
let span = state.span.child("provisionable-data");
|
||||
let _timer = metrics.time_provisionable_data();
|
||||
|
||||
gum::trace!(target: LOG_TARGET, ?relay_parent, "Received provisionable data.");
|
||||
|
||||
note_provisionable_data(state, &span, data);
|
||||
}
|
||||
},
|
||||
@@ -228,28 +248,42 @@ async fn send_inherent_data_bg<Context>(
|
||||
let _span = span;
|
||||
let _timer = metrics.time_request_inherent_data();
|
||||
|
||||
if let Err(err) = send_inherent_data(
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
relay_parent = ?leaf.hash,
|
||||
"Sending inherent data in background."
|
||||
);
|
||||
|
||||
let send_result = send_inherent_data(
|
||||
&leaf,
|
||||
&signed_bitfields,
|
||||
&backed_candidates,
|
||||
return_senders,
|
||||
&mut sender,
|
||||
&metrics,
|
||||
)
|
||||
.await
|
||||
{
|
||||
gum::warn!(target: LOG_TARGET, err = ?err, "failed to assemble or send inherent data");
|
||||
metrics.on_inherent_data_request(Err(()));
|
||||
} else {
|
||||
metrics.on_inherent_data_request(Ok(()));
|
||||
gum::debug!(
|
||||
target: LOG_TARGET,
|
||||
signed_bitfield_count = signed_bitfields.len(),
|
||||
backed_candidates_count = backed_candidates.len(),
|
||||
leaf_hash = ?leaf.hash,
|
||||
"inherent data sent successfully"
|
||||
);
|
||||
metrics.observe_inherent_data_bitfields_count(signed_bitfields.len());
|
||||
) // Make sure call is not taking forever:
|
||||
.timeout(SEND_INHERENT_DATA_TIMEOUT)
|
||||
.map(|v| match v {
|
||||
Some(r) => r,
|
||||
None => Err(Error::SendInherentDataTimeout),
|
||||
});
|
||||
|
||||
match send_result.await {
|
||||
Err(err) => {
|
||||
gum::warn!(target: LOG_TARGET, err = ?err, "failed to assemble or send inherent data");
|
||||
metrics.on_inherent_data_request(Err(()));
|
||||
},
|
||||
Ok(()) => {
|
||||
metrics.on_inherent_data_request(Ok(()));
|
||||
gum::debug!(
|
||||
target: LOG_TARGET,
|
||||
signed_bitfield_count = signed_bitfields.len(),
|
||||
backed_candidates_count = backed_candidates.len(),
|
||||
leaf_hash = ?leaf.hash,
|
||||
"inherent data sent successfully"
|
||||
);
|
||||
metrics.observe_inherent_data_bitfields_count(signed_bitfields.len());
|
||||
},
|
||||
}
|
||||
};
|
||||
|
||||
@@ -312,12 +346,27 @@ async fn send_inherent_data(
|
||||
from_job: &mut impl overseer::ProvisionerSenderTrait,
|
||||
metrics: &Metrics,
|
||||
) -> Result<(), Error> {
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
relay_parent = ?leaf.hash,
|
||||
"Requesting availability cores"
|
||||
);
|
||||
let availability_cores = request_availability_cores(leaf.hash, from_job)
|
||||
.await
|
||||
.await
|
||||
.map_err(|err| Error::CanceledAvailabilityCores(err))??;
|
||||
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
relay_parent = ?leaf.hash,
|
||||
"Selecting disputes"
|
||||
);
|
||||
let disputes = select_disputes(from_job, metrics, leaf).await?;
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
relay_parent = ?leaf.hash,
|
||||
"Selected disputes"
|
||||
);
|
||||
|
||||
// Only include bitfields on fresh leaves. On chain reversions, we want to make sure that
|
||||
// there will be at least one block, which cannot get disputed, so the chain can make progress.
|
||||
@@ -326,9 +375,21 @@ async fn send_inherent_data(
|
||||
select_availability_bitfields(&availability_cores, bitfields, &leaf.hash),
|
||||
LeafStatus::Stale => Vec::new(),
|
||||
};
|
||||
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
relay_parent = ?leaf.hash,
|
||||
"Selected bitfields"
|
||||
);
|
||||
let candidates =
|
||||
select_candidates(&availability_cores, &bitfields, candidates, leaf.hash, from_job).await?;
|
||||
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
relay_parent = ?leaf.hash,
|
||||
"Selected candidates"
|
||||
);
|
||||
|
||||
gum::debug!(
|
||||
target: LOG_TARGET,
|
||||
availability_cores_len = availability_cores.len(),
|
||||
@@ -342,6 +403,12 @@ async fn send_inherent_data(
|
||||
let inherent_data =
|
||||
ProvisionerInherentData { bitfields, backed_candidates: candidates, disputes };
|
||||
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
relay_parent = ?leaf.hash,
|
||||
"Sending back inherent data to requesters."
|
||||
);
|
||||
|
||||
for return_sender in return_senders {
|
||||
return_sender
|
||||
.send(inherent_data.clone())
|
||||
@@ -765,6 +832,12 @@ async fn select_disputes(
|
||||
active
|
||||
};
|
||||
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
relay_parent = ?_leaf.hash,
|
||||
"Request recent disputes"
|
||||
);
|
||||
|
||||
// We use `RecentDisputes` instead of `ActiveDisputes` because redundancy is fine.
|
||||
// It's heavier than `ActiveDisputes` but ensures that everything from the dispute
|
||||
// window gets on-chain, unlike `ActiveDisputes`.
|
||||
@@ -773,6 +846,18 @@ async fn select_disputes(
|
||||
// If the active ones are already exceeding the bounds, randomly select a subset.
|
||||
let recent = request_disputes(sender, RequestType::Recent).await;
|
||||
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
relay_paent = ?_leaf.hash,
|
||||
"Received recent disputes"
|
||||
);
|
||||
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
relay_paent = ?_leaf.hash,
|
||||
"Request on chain disputes"
|
||||
);
|
||||
|
||||
// On chain disputes are fetched from the runtime. We want to prioritise the inclusion of unknown
|
||||
// disputes in the inherent data. The call relies on staging Runtime API. If the staging API is not
|
||||
// enabled in the binary an empty set is generated which doesn't affect the rest of the logic.
|
||||
@@ -788,6 +873,18 @@ async fn select_disputes(
|
||||
},
|
||||
};
|
||||
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
relay_paent = ?_leaf.hash,
|
||||
"Received on chain disputes"
|
||||
);
|
||||
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
relay_paent = ?_leaf.hash,
|
||||
"Filtering disputes"
|
||||
);
|
||||
|
||||
let disputes = if recent.len() > MAX_DISPUTES_FORWARDED_TO_RUNTIME {
|
||||
gum::warn!(
|
||||
target: LOG_TARGET,
|
||||
@@ -805,20 +902,34 @@ async fn select_disputes(
|
||||
recent
|
||||
};
|
||||
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
relay_paent = ?_leaf.hash,
|
||||
"Calling `request_votes`"
|
||||
);
|
||||
|
||||
// Load all votes for all disputes from the coordinator.
|
||||
let dispute_candidate_votes = request_votes(sender, disputes).await;
|
||||
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
relay_paent = ?_leaf.hash,
|
||||
"Finished `request_votes`"
|
||||
);
|
||||
|
||||
// Transform all `CandidateVotes` into `MultiDisputeStatementSet`.
|
||||
Ok(dispute_candidate_votes
|
||||
.into_iter()
|
||||
.map(|(session_index, candidate_hash, votes)| {
|
||||
let valid_statements =
|
||||
votes.valid.into_iter().map(|(s, i, sig)| (DisputeStatement::Valid(s), i, sig));
|
||||
let valid_statements = votes
|
||||
.valid
|
||||
.into_iter()
|
||||
.map(|(i, (s, sig))| (DisputeStatement::Valid(s), i, sig));
|
||||
|
||||
let invalid_statements = votes
|
||||
.invalid
|
||||
.into_iter()
|
||||
.map(|(s, i, sig)| (DisputeStatement::Invalid(s), i, sig));
|
||||
.map(|(i, (s, sig))| (DisputeStatement::Invalid(s), i, sig));
|
||||
|
||||
metrics.inc_valid_statements_by(valid_statements.len());
|
||||
metrics.inc_invalid_statements_by(invalid_statements.len());
|
||||
|
||||
@@ -571,8 +571,8 @@ mod select_disputes {
|
||||
let mut res = Vec::new();
|
||||
let v = CandidateVotes {
|
||||
candidate_receipt: test_helpers::dummy_candidate_receipt(leaf.hash.clone()),
|
||||
valid: vec![],
|
||||
invalid: vec![],
|
||||
valid: BTreeMap::new(),
|
||||
invalid: BTreeMap::new(),
|
||||
};
|
||||
for r in disputes.iter() {
|
||||
res.push((r.0, r.1, v.clone()));
|
||||
|
||||
@@ -1210,6 +1210,49 @@ impl State {
|
||||
}
|
||||
}
|
||||
|
||||
/// Retrieve approval signatures from state for the given relay block/indices:
|
||||
fn get_approval_signatures(
|
||||
&mut self,
|
||||
indices: HashSet<(Hash, CandidateIndex)>,
|
||||
) -> HashMap<ValidatorIndex, ValidatorSignature> {
|
||||
let mut all_sigs = HashMap::new();
|
||||
for (hash, index) in indices {
|
||||
let block_entry = match self.blocks.get(&hash) {
|
||||
None => {
|
||||
gum::debug!(
|
||||
target: LOG_TARGET,
|
||||
?hash,
|
||||
"`get_approval_signatures`: could not find block entry for given hash!"
|
||||
);
|
||||
continue
|
||||
},
|
||||
Some(e) => e,
|
||||
};
|
||||
|
||||
let candidate_entry = match block_entry.candidates.get(index as usize) {
|
||||
None => {
|
||||
gum::debug!(
|
||||
target: LOG_TARGET,
|
||||
?hash,
|
||||
?index,
|
||||
"`get_approval_signatures`: could not find candidate entry for given hash and index!"
|
||||
);
|
||||
continue
|
||||
},
|
||||
Some(e) => e,
|
||||
};
|
||||
let sigs =
|
||||
candidate_entry.messages.iter().filter_map(|(validator_index, message_state)| {
|
||||
match &message_state.approval_state {
|
||||
ApprovalState::Approved(_, sig) => Some((*validator_index, sig.clone())),
|
||||
ApprovalState::Assigned(_) => None,
|
||||
}
|
||||
});
|
||||
all_sigs.extend(sigs);
|
||||
}
|
||||
all_sigs
|
||||
}
|
||||
|
||||
async fn unify_with_peer(
|
||||
sender: &mut impl overseer::ApprovalDistributionSenderTrait,
|
||||
metrics: &Metrics,
|
||||
@@ -1681,6 +1724,15 @@ impl ApprovalDistribution {
|
||||
.import_and_circulate_approval(ctx, metrics, MessageSource::Local, vote)
|
||||
.await;
|
||||
},
|
||||
ApprovalDistributionMessage::GetApprovalSignatures(indices, tx) => {
|
||||
let sigs = state.get_approval_signatures(indices);
|
||||
if let Err(_) = tx.send(sigs) {
|
||||
gum::debug!(
|
||||
target: LOG_TARGET,
|
||||
"Sending back approval signatures failed, oneshot got closed"
|
||||
);
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -264,10 +264,8 @@ where
|
||||
};
|
||||
|
||||
let (pending_confirmation, confirmation_rx) = oneshot::channel();
|
||||
let candidate_hash = candidate_receipt.hash();
|
||||
self.sender
|
||||
.send_message(DisputeCoordinatorMessage::ImportStatements {
|
||||
candidate_hash,
|
||||
candidate_receipt,
|
||||
session: valid_vote.0.session_index(),
|
||||
statements: vec![valid_vote, invalid_vote],
|
||||
|
||||
@@ -231,24 +231,25 @@ impl DisputeSender {
|
||||
Some(votes) => votes,
|
||||
};
|
||||
|
||||
let our_valid_vote = votes.valid.iter().find(|(_, i, _)| *i == our_index);
|
||||
let our_valid_vote = votes.valid.get(&our_index);
|
||||
|
||||
let our_invalid_vote = votes.invalid.iter().find(|(_, i, _)| *i == our_index);
|
||||
let our_invalid_vote = votes.invalid.get(&our_index);
|
||||
|
||||
let (valid_vote, invalid_vote) = if let Some(our_valid_vote) = our_valid_vote {
|
||||
// Get some invalid vote as well:
|
||||
let invalid_vote =
|
||||
votes.invalid.get(0).ok_or(JfyiError::MissingVotesFromCoordinator)?;
|
||||
(our_valid_vote, invalid_vote)
|
||||
votes.invalid.iter().next().ok_or(JfyiError::MissingVotesFromCoordinator)?;
|
||||
((&our_index, our_valid_vote), invalid_vote)
|
||||
} else if let Some(our_invalid_vote) = our_invalid_vote {
|
||||
// Get some valid vote as well:
|
||||
let valid_vote = votes.valid.get(0).ok_or(JfyiError::MissingVotesFromCoordinator)?;
|
||||
(valid_vote, our_invalid_vote)
|
||||
let valid_vote =
|
||||
votes.valid.iter().next().ok_or(JfyiError::MissingVotesFromCoordinator)?;
|
||||
(valid_vote, (&our_index, our_invalid_vote))
|
||||
} else {
|
||||
// There is no vote from us yet - nothing to do.
|
||||
return Ok(())
|
||||
};
|
||||
let (kind, valid_index, signature) = valid_vote;
|
||||
let (valid_index, (kind, signature)) = valid_vote;
|
||||
let valid_public = info
|
||||
.session_info
|
||||
.validators
|
||||
@@ -263,7 +264,7 @@ impl DisputeSender {
|
||||
)
|
||||
.map_err(|()| JfyiError::InvalidStatementFromCoordinator)?;
|
||||
|
||||
let (kind, invalid_index, signature) = invalid_vote;
|
||||
let (invalid_index, (kind, signature)) = invalid_vote;
|
||||
let invalid_public = info
|
||||
.session_info
|
||||
.validators
|
||||
|
||||
@@ -274,16 +274,19 @@ fn disputes_are_recovered_at_startup() {
|
||||
let unchecked: UncheckedDisputeMessage = message.into();
|
||||
tx.send(vec![(session_index, candidate_hash, CandidateVotes {
|
||||
candidate_receipt: candidate,
|
||||
valid: vec![(
|
||||
unchecked.valid_vote.kind,
|
||||
valid: [(
|
||||
unchecked.valid_vote.validator_index,
|
||||
(unchecked.valid_vote.kind,
|
||||
unchecked.valid_vote.signature
|
||||
)],
|
||||
invalid: vec![(
|
||||
unchecked.invalid_vote.kind,
|
||||
),
|
||||
)].into_iter().collect(),
|
||||
invalid: [(
|
||||
unchecked.invalid_vote.validator_index,
|
||||
(
|
||||
unchecked.invalid_vote.kind,
|
||||
unchecked.invalid_vote.signature
|
||||
)],
|
||||
),
|
||||
)].into_iter().collect(),
|
||||
})])
|
||||
.expect("Receiver should stay alive.");
|
||||
}
|
||||
@@ -522,16 +525,15 @@ async fn nested_network_dispute_request<'a, F, O>(
|
||||
handle.recv().await,
|
||||
AllMessages::DisputeCoordinator(
|
||||
DisputeCoordinatorMessage::ImportStatements {
|
||||
candidate_hash,
|
||||
candidate_receipt,
|
||||
session,
|
||||
statements,
|
||||
pending_confirmation: Some(pending_confirmation),
|
||||
}
|
||||
) => {
|
||||
let candidate_hash = candidate_receipt.hash();
|
||||
assert_eq!(session, MOCK_SESSION_INDEX);
|
||||
assert_eq!(candidate_hash, message.0.candidate_receipt.hash());
|
||||
assert_eq!(candidate_hash, candidate_receipt.hash());
|
||||
assert_eq!(statements.len(), 2);
|
||||
pending_confirmation
|
||||
}
|
||||
|
||||
@@ -467,7 +467,6 @@ pub struct Overseer<SupportsParachains> {
|
||||
StatementDistributionMessage,
|
||||
ProvisionerMessage,
|
||||
RuntimeApiMessage,
|
||||
DisputeCoordinatorMessage,
|
||||
])]
|
||||
candidate_backing: CandidateBacking,
|
||||
|
||||
@@ -562,13 +561,13 @@ pub struct Overseer<SupportsParachains> {
|
||||
approval_distribution: ApprovalDistribution,
|
||||
|
||||
#[subsystem(blocking, ApprovalVotingMessage, sends: [
|
||||
RuntimeApiMessage,
|
||||
ApprovalDistributionMessage,
|
||||
AvailabilityRecoveryMessage,
|
||||
CandidateValidationMessage,
|
||||
ChainApiMessage,
|
||||
ChainSelectionMessage,
|
||||
DisputeCoordinatorMessage,
|
||||
AvailabilityRecoveryMessage,
|
||||
ApprovalDistributionMessage,
|
||||
CandidateValidationMessage,
|
||||
RuntimeApiMessage,
|
||||
])]
|
||||
approval_voting: ApprovalVoting,
|
||||
|
||||
@@ -585,6 +584,7 @@ pub struct Overseer<SupportsParachains> {
|
||||
ChainApiMessage,
|
||||
DisputeDistributionMessage,
|
||||
CandidateValidationMessage,
|
||||
ApprovalVotingMessage,
|
||||
AvailabilityStoreMessage,
|
||||
AvailabilityRecoveryMessage,
|
||||
])]
|
||||
|
||||
@@ -14,6 +14,8 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use std::collections::{BTreeMap, BTreeSet};
|
||||
|
||||
use parity_scale_codec::{Decode, Encode};
|
||||
|
||||
use sp_application_crypto::AppKey;
|
||||
@@ -45,21 +47,23 @@ pub struct CandidateVotes {
|
||||
/// The receipt of the candidate itself.
|
||||
pub candidate_receipt: CandidateReceipt,
|
||||
/// Votes of validity, sorted by validator index.
|
||||
pub valid: Vec<(ValidDisputeStatementKind, ValidatorIndex, ValidatorSignature)>,
|
||||
pub valid: BTreeMap<ValidatorIndex, (ValidDisputeStatementKind, ValidatorSignature)>,
|
||||
/// Votes of invalidity, sorted by validator index.
|
||||
pub invalid: Vec<(InvalidDisputeStatementKind, ValidatorIndex, ValidatorSignature)>,
|
||||
pub invalid: BTreeMap<ValidatorIndex, (InvalidDisputeStatementKind, ValidatorSignature)>,
|
||||
}
|
||||
|
||||
/// Type alias for retrieving valid votes from `CandidateVotes`
|
||||
pub type ValidVoteData = (ValidatorIndex, (ValidDisputeStatementKind, ValidatorSignature));
|
||||
|
||||
/// Type alias for retrieving invalid votes from `CandidateVotes`
|
||||
pub type InvalidVoteData = (ValidatorIndex, (InvalidDisputeStatementKind, ValidatorSignature));
|
||||
|
||||
impl CandidateVotes {
|
||||
/// Get the set of all validators who have votes in the set, ascending.
|
||||
pub fn voted_indices(&self) -> Vec<ValidatorIndex> {
|
||||
let mut v: Vec<_> =
|
||||
self.valid.iter().map(|x| x.1).chain(self.invalid.iter().map(|x| x.1)).collect();
|
||||
|
||||
v.sort();
|
||||
v.dedup();
|
||||
|
||||
v
|
||||
pub fn voted_indices(&self) -> BTreeSet<ValidatorIndex> {
|
||||
let mut keys: BTreeSet<_> = self.valid.keys().cloned().collect();
|
||||
keys.extend(self.invalid.keys().cloned());
|
||||
keys
|
||||
}
|
||||
}
|
||||
|
||||
@@ -162,6 +166,11 @@ impl SignedDisputeStatement {
|
||||
&self.validator_signature
|
||||
}
|
||||
|
||||
/// Consume self to return the signature.
|
||||
pub fn into_validator_signature(self) -> ValidatorSignature {
|
||||
self.validator_signature
|
||||
}
|
||||
|
||||
/// Access the underlying session index.
|
||||
pub fn session_index(&self) -> SessionIndex {
|
||||
self.session_index
|
||||
|
||||
@@ -243,8 +243,6 @@ pub enum DisputeCoordinatorMessage {
|
||||
///
|
||||
/// This does not do any checking of the message signature.
|
||||
ImportStatements {
|
||||
/// The hash of the candidate.
|
||||
candidate_hash: CandidateHash,
|
||||
/// The candidate receipt itself.
|
||||
candidate_receipt: CandidateReceipt,
|
||||
/// The session the candidate appears in.
|
||||
@@ -275,7 +273,7 @@ pub enum DisputeCoordinatorMessage {
|
||||
/// and which may have already concluded.
|
||||
RecentDisputes(oneshot::Sender<Vec<(SessionIndex, CandidateHash)>>),
|
||||
/// Fetch a list of all active disputes that the coordinator is aware of.
|
||||
/// These disputes are either unconcluded or recently concluded.
|
||||
/// These disputes are either not yet concluded or recently concluded.
|
||||
ActiveDisputes(oneshot::Sender<Vec<(SessionIndex, CandidateHash)>>),
|
||||
/// Get candidate votes for a candidate.
|
||||
QueryCandidateVotes(
|
||||
@@ -908,6 +906,15 @@ pub enum ApprovalVotingMessage {
|
||||
/// It can also return the same block hash, if that is acceptable to vote upon.
|
||||
/// Return `None` if the input hash is unrecognized.
|
||||
ApprovedAncestor(Hash, BlockNumber, oneshot::Sender<Option<HighestApprovedAncestorBlock>>),
|
||||
|
||||
/// Retrieve all available approval signatures for a candidate from approval-voting.
|
||||
///
|
||||
/// This message involves a linear search for candidates on each relay chain fork and also
|
||||
/// requires calling into `approval-distribution`: Calls should be infrequent and bounded.
|
||||
GetApprovalSignaturesForCandidate(
|
||||
CandidateHash,
|
||||
oneshot::Sender<HashMap<ValidatorIndex, ValidatorSignature>>,
|
||||
),
|
||||
}
|
||||
|
||||
/// Message to the Approval Distribution subsystem.
|
||||
@@ -926,6 +933,12 @@ pub enum ApprovalDistributionMessage {
|
||||
/// An update from the network bridge.
|
||||
#[from]
|
||||
NetworkBridgeUpdate(NetworkBridgeEvent<net_protocol::ApprovalDistributionMessage>),
|
||||
|
||||
/// Get all approval signatures for all chains a candidate appeared in.
|
||||
GetApprovalSignatures(
|
||||
HashSet<(Hash, CandidateIndex)>,
|
||||
oneshot::Sender<HashMap<ValidatorIndex, ValidatorSignature>>,
|
||||
),
|
||||
}
|
||||
|
||||
/// Message to the Gossip Support subsystem.
|
||||
|
||||
Reference in New Issue
Block a user