Dispute coordinator - Recover disputes on startup (#3481)

* node/dispute-coordinator: Introduce resume capability

This commit introduces a resume capability for the
dispute coordinator subsystem. Specifically, this will allow
to recover data for disputes for which we have no local statements.

* node/dispute-coordinator: Add resume function to TestState and modify Harness

This commit modifies the TestHarness to return a TestState. We subsequently
define a resume function on TestState that allows to interrupt the test and
test specifically for behavior on startup of the subsystem.

* node/dispute-coordinator: Implement resume functionality

This commit implements the resume functionality for the subsystem.
In addition, we will forward any DisputeParticipation::Participate
message in order to ensure that disputes for which we do not have
local statements may be recovered in due time.

* Address Feedback

* Modify to run handle_leaf on first import

* Modify missing_local_statement logic

* node/dispute-coordinator: Add simple test to ensure we adequately
handle local_statements that are not missing.

* Add missing keystore tests
This commit is contained in:
Lldenaurois
2021-07-21 17:36:43 -04:00
committed by GitHub
parent 7aaaa9ec08
commit 1858ff57fc
4 changed files with 698 additions and 32 deletions
+1
View File
@@ -6337,6 +6337,7 @@ dependencies = [
"polkadot-node-subsystem", "polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers", "polkadot-node-subsystem-test-helpers",
"polkadot-node-subsystem-util", "polkadot-node-subsystem-util",
"polkadot-overseer",
"polkadot-primitives", "polkadot-primitives",
"sc-keystore", "sc-keystore",
"sp-core", "sp-core",
@@ -27,3 +27,4 @@ sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
assert_matches = "1.4.0" assert_matches = "1.4.0"
polkadot-overseer = { path = "../../overseer" }
@@ -35,7 +35,7 @@ use polkadot_node_subsystem::{
errors::{ChainApiError, RuntimeApiError}, errors::{ChainApiError, RuntimeApiError},
messages::{ messages::{
ChainApiMessage, DisputeCoordinatorMessage, DisputeDistributionMessage, ChainApiMessage, DisputeCoordinatorMessage, DisputeDistributionMessage,
DisputeParticipationMessage, ImportStatementsResult DisputeParticipationMessage, ImportStatementsResult,
} }
}; };
use polkadot_node_subsystem_util::rolling_session_window::{ use polkadot_node_subsystem_util::rolling_session_window::{
@@ -71,10 +71,27 @@ const ACTIVE_DURATION_SECS: Timestamp = 180;
/// Timestamp based on the 1 Jan 1970 UNIX base, which is persistent across node restarts and OS reboots. /// Timestamp based on the 1 Jan 1970 UNIX base, which is persistent across node restarts and OS reboots.
type Timestamp = u64; type Timestamp = u64;
#[derive(Eq, PartialEq)]
enum Participation {
Pending,
Complete,
}
impl Participation {
fn complete(&mut self) -> bool {
let complete = *self == Participation::Complete;
if !complete {
*self = Participation::Complete
}
complete
}
}
struct State { struct State {
keystore: Arc<LocalKeystore>, keystore: Arc<LocalKeystore>,
highest_session: Option<SessionIndex>, highest_session: Option<SessionIndex>,
rolling_session_window: RollingSessionWindow, rolling_session_window: RollingSessionWindow,
recovery_state: Participation,
} }
/// Configuration for the dispute coordinator subsystem. /// Configuration for the dispute coordinator subsystem.
@@ -277,7 +294,7 @@ where
B: Backend, B: Backend,
{ {
loop { loop {
let res = run_iteration(&mut ctx, &subsystem, &mut backend, &*clock).await; let res = run_until_error(&mut ctx, &subsystem, &mut backend, &*clock).await;
match res { match res {
Err(e) => { Err(e) => {
e.trace(); e.trace();
@@ -299,7 +316,7 @@ where
// //
// A return value of `Ok` indicates that an exit should be made, while non-fatal errors // A return value of `Ok` indicates that an exit should be made, while non-fatal errors
// lead to another call to this function. // lead to another call to this function.
async fn run_iteration<B, Context>( async fn run_until_error<B, Context>(
ctx: &mut Context, ctx: &mut Context,
subsystem: &DisputeCoordinatorSubsystem, subsystem: &DisputeCoordinatorSubsystem,
backend: &mut B, backend: &mut B,
@@ -314,6 +331,7 @@ where
keystore: subsystem.keystore.clone(), keystore: subsystem.keystore.clone(),
highest_session: None, highest_session: None,
rolling_session_window: RollingSessionWindow::new(DISPUTE_WINDOW), rolling_session_window: RollingSessionWindow::new(DISPUTE_WINDOW),
recovery_state: Participation::Pending,
}; };
loop { loop {
@@ -328,7 +346,14 @@ where
&mut overlay_db, &mut overlay_db,
&mut state, &mut state,
update.activated.into_iter().map(|a| a.hash), update.activated.into_iter().map(|a| a.hash),
).await? ).await?;
if !state.recovery_state.complete() {
handle_startup(
ctx,
&mut overlay_db,
&mut state,
).await?;
}
} }
FromOverseer::Signal(OverseerSignal::BlockFinalized(_, _)) => {}, FromOverseer::Signal(OverseerSignal::BlockFinalized(_, _)) => {},
FromOverseer::Communication { msg } => { FromOverseer::Communication { msg } => {
@@ -349,6 +374,98 @@ where
} }
} }
// Restores the subsystem's state before proceeding with the main event loop. Primarily, this
// repopulates the rolling session window the relevant session information to handle incoming
// import statement requests.
//
// This method also retransmits a `DisputeParticiationMessage::Participate` for any non-concluded
// disputes for which the subsystem doesn't have a local statement, ensuring it eventually makes an
// arbitration on the dispute.
async fn handle_startup<Context>(
ctx: &mut Context,
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
state: &mut State,
) -> Result<(), Error>
where
Context: overseer::SubsystemContext<Message = DisputeCoordinatorMessage>,
Context: SubsystemContext<Message = DisputeCoordinatorMessage>,
{
let recent_disputes = match overlay_db.load_recent_disputes() {
Ok(Some(disputes)) => disputes,
Ok(None) => return Ok(()),
Err(e) => {
tracing::error!(target: LOG_TARGET, "Failed initial load of recent disputes: {:?}", e);
return Err(e.into());
},
};
// Filter out disputes that have already concluded.
let active_disputes = recent_disputes.into_iter()
.filter(|(_, status)| *status == DisputeStatus::Active)
.collect::<RecentDisputes>();
for ((session, ref candidate_hash), _) in active_disputes.into_iter() {
let votes: CandidateVotes = match overlay_db.load_candidate_votes(session, candidate_hash) {
Ok(Some(votes)) => votes.into(),
Ok(None) => continue,
Err(e) => {
tracing::error!(target: LOG_TARGET, "Failed initial load of candidate votes: {:?}", e);
continue
},
};
let validators = match state.rolling_session_window.session_info(session) {
None => {
tracing::warn!(
target: LOG_TARGET,
session,
"Missing info for session which has an active dispute",
);
continue
}
Some(info) => info.validators.clone(),
};
let n_validators = validators.len();
let voted_indices: HashSet<_> = votes.voted_indices().into_iter().collect();
// Determine if there are any missing local statements for this dispute. Validators are
// filtered if:
// 1) their statement already exists, or
// 2) the validator key is not in the local keystore (i.e. the validator is remote).
// The remaining set only contains local validators that are also missing statements.
let missing_local_statement = validators.iter()
.enumerate()
.map(|(index, validator)| (ValidatorIndex(index as _), validator))
.any(|(index, validator)|
!voted_indices.contains(&index) &&
state.keystore
.key_pair::<ValidatorPair>(validator)
.ok()
.map_or(false, |v| v.is_some())
);
// Send a `DisputeParticipationMessage` for all non-concluded disputes which do not have a
// recorded local statement.
if missing_local_statement {
let (report_availability, receive_availability) = oneshot::channel();
ctx.send_message(DisputeParticipationMessage::Participate {
candidate_hash: *candidate_hash,
candidate_receipt: votes.candidate_receipt.clone(),
session,
n_validators: n_validators as u32,
report_availability,
}).await;
if !receive_availability.await? {
tracing::debug!(target: LOG_TARGET, "Participation failed. Candidate not available");
}
}
}
Ok(())
}
async fn handle_new_activations( async fn handle_new_activations(
ctx: &mut (impl SubsystemContext<Message = DisputeCoordinatorMessage> + overseer::SubsystemContext<Message = DisputeCoordinatorMessage>), ctx: &mut (impl SubsystemContext<Message = DisputeCoordinatorMessage> + overseer::SubsystemContext<Message = DisputeCoordinatorMessage>),
overlay_db: &mut OverlayedBackend<'_, impl Backend>, overlay_db: &mut OverlayedBackend<'_, impl Backend>,
@@ -470,8 +587,8 @@ async fn handle_incoming(
) => { ) => {
issue_local_statement( issue_local_statement(
ctx, ctx,
state,
overlay_db, overlay_db,
state,
candidate_hash, candidate_hash,
candidate_receipt, candidate_receipt,
session, session,
@@ -547,6 +664,10 @@ async fn handle_import_statements(
"Missing info for session which has an active dispute", "Missing info for session which has an active dispute",
); );
pending_confirmation
.send(ImportStatementsResult::InvalidImport)
.map_err(|_| Error::OneshotSend)?;
return Ok(()) return Ok(())
} }
Some(info) => info.validators.clone(), Some(info) => info.validators.clone(),
@@ -671,13 +792,17 @@ async fn handle_import_statements(
overlay_db.write_candidate_votes(session, candidate_hash, votes.into()); overlay_db.write_candidate_votes(session, candidate_hash, votes.into());
pending_confirmation
.send(ImportStatementsResult::ValidImport)
.map_err(|_| Error::OneshotSend)?;
Ok(()) Ok(())
} }
async fn issue_local_statement( async fn issue_local_statement(
ctx: &mut impl SubsystemContext, ctx: &mut impl SubsystemContext,
state: &mut State,
overlay_db: &mut OverlayedBackend<'_, impl Backend>, overlay_db: &mut OverlayedBackend<'_, impl Backend>,
state: &mut State,
candidate_hash: CandidateHash, candidate_hash: CandidateHash,
candidate_receipt: CandidateReceipt, candidate_receipt: CandidateReceipt,
session: SessionIndex, session: SessionIndex,
@@ -14,8 +14,10 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>. // along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use std::collections::HashMap;
use super::*; use super::*;
use overseer::TimeoutExt;
use polkadot_primitives::v1::{BlakeTwo256, HashT, ValidatorId, Header, SessionInfo}; use polkadot_primitives::v1::{BlakeTwo256, HashT, ValidatorId, Header, SessionInfo};
use polkadot_node_subsystem::{jaeger, ActiveLeavesUpdate, ActivatedLeaf, LeafStatus}; use polkadot_node_subsystem::{jaeger, ActiveLeavesUpdate, ActivatedLeaf, LeafStatus};
use polkadot_node_subsystem::messages::{ use polkadot_node_subsystem::messages::{
@@ -33,6 +35,9 @@ use parity_scale_codec::Encode;
use assert_matches::assert_matches; use assert_matches::assert_matches;
use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering}; use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
use std::time::Duration;
const TEST_TIMEOUT: Duration = Duration::from_secs(2);
// sets up a keystore with the given keyring accounts. // sets up a keystore with the given keyring accounts.
fn make_keystore(accounts: &[Sr25519Keyring]) -> LocalKeystore { fn make_keystore(accounts: &[Sr25519Keyring]) -> LocalKeystore {
@@ -86,6 +91,7 @@ struct TestState {
db: Arc<dyn KeyValueDB>, db: Arc<dyn KeyValueDB>,
config: Config, config: Config,
clock: MockClock, clock: MockClock,
headers: HashMap<Hash, Header>,
} }
impl Default for TestState { impl Default for TestState {
@@ -126,13 +132,14 @@ impl Default for TestState {
db, db,
config, config,
clock: MockClock::default(), clock: MockClock::default(),
headers: HashMap::new(),
} }
} }
} }
impl TestState { impl TestState {
async fn activate_leaf_at_session( async fn activate_leaf_at_session(
&self, &mut self,
virtual_overseer: &mut VirtualOverseer, virtual_overseer: &mut VirtualOverseer,
session: SessionIndex, session: SessionIndex,
block_number: BlockNumber, block_number: BlockNumber,
@@ -149,6 +156,8 @@ impl TestState {
}; };
let block_hash = block_header.hash(); let block_hash = block_header.hash();
let _ = self.headers.insert(block_hash, block_header.clone());
virtual_overseer.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves( virtual_overseer.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::start_work(ActivatedLeaf { ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: block_hash, hash: block_hash,
@@ -158,6 +167,16 @@ impl TestState {
}) })
))).await; ))).await;
self.handle_sync_queries(virtual_overseer, block_hash, block_header, session).await;
}
async fn handle_sync_queries(
&self,
virtual_overseer: &mut VirtualOverseer,
block_hash: Hash,
block_header: Header,
session: SessionIndex,
) {
assert_matches!( assert_matches!(
virtual_overseer.recv().await, virtual_overseer.recv().await,
AllMessages::ChainApi(ChainApiMessage::BlockHeader(h, tx)) => { AllMessages::ChainApi(ChainApiMessage::BlockHeader(h, tx)) => {
@@ -172,6 +191,7 @@ impl TestState {
h, h,
RuntimeApiRequest::SessionIndexForChild(tx), RuntimeApiRequest::SessionIndexForChild(tx),
)) => { )) => {
let parent_hash = session_to_hash(session, b"parent");
assert_eq!(h, parent_hash); assert_eq!(h, parent_hash);
let _ = tx.send(Ok(session)); let _ = tx.send(Ok(session));
} }
@@ -194,6 +214,29 @@ impl TestState {
} }
} }
async fn handle_resume_sync(&self, virtual_overseer: &mut VirtualOverseer, session: SessionIndex) {
let leaves: Vec<Hash> = self.headers.keys().cloned().collect();
for leaf in leaves.iter() {
virtual_overseer.send(
FromOverseer::Signal(
OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::start_work(
ActivatedLeaf {
hash: *leaf,
number: 1,
span: Arc::new(jaeger::Span::Disabled),
status: LeafStatus::Fresh,
}
)
)
)
).await;
let header = self.headers.get(leaf).unwrap().clone();
self.handle_sync_queries(virtual_overseer, *leaf, header, session).await;
}
}
fn session_info(&self) -> SessionInfo { fn session_info(&self) -> SessionInfo {
let discovery_keys = self.validators.iter() let discovery_keys = self.validators.iter()
.map(|k| <_>::from(k.public())) .map(|k| <_>::from(k.public()))
@@ -236,32 +279,38 @@ impl TestState {
public, public,
).await.unwrap().unwrap() ).await.unwrap().unwrap()
} }
fn resume<F>(self, test: F) -> Self
where F: FnOnce(TestState, VirtualOverseer) -> BoxFuture<'static, TestState>
{
let (ctx, ctx_handle) = make_subsystem_context(TaskExecutor::new());
let subsystem = DisputeCoordinatorSubsystem::new(
self.db.clone(),
self.config.clone(),
self.subsystem_keystore.clone(),
);
let backend = DbBackend::new(self.db.clone(), self.config.column_config());
let subsystem_task = run(subsystem, ctx, backend, Box::new(self.clock.clone()));
let test_task = test(self, ctx_handle);
let (_, state) = futures::executor::block_on(future::join(subsystem_task, test_task));
state
}
} }
fn test_harness<F>(test: F) fn test_harness<F>(test: F) -> TestState
where F: FnOnce(TestState, VirtualOverseer) -> BoxFuture<'static, ()> where F: FnOnce(TestState, VirtualOverseer) -> BoxFuture<'static, TestState>
{ {
let (ctx, ctx_handle) = make_subsystem_context(TaskExecutor::new()); TestState::default().resume(test)
let state = TestState::default();
let subsystem = DisputeCoordinatorSubsystem::new(
state.db.clone(),
state.config.clone(),
state.subsystem_keystore.clone(),
);
let backend = DbBackend::new(state.db.clone(), state.config.column_config());
let subsystem_task = run(subsystem, ctx, backend, Box::new(state.clock.clone()));
let test_task = test(state, ctx_handle);
futures::executor::block_on(future::join(subsystem_task, test_task));
} }
#[test] #[test]
fn conflicting_votes_lead_to_dispute_participation() { fn conflicting_votes_lead_to_dispute_participation() {
test_harness(|test_state, mut virtual_overseer| Box::pin(async move { test_harness(|mut test_state, mut virtual_overseer| Box::pin(async move {
let session = 1; let session = 1;
test_state.handle_resume_sync(&mut virtual_overseer, session).await;
let candidate_receipt = CandidateReceipt::default(); let candidate_receipt = CandidateReceipt::default();
let candidate_hash = candidate_receipt.hash(); let candidate_hash = candidate_receipt.hash();
@@ -374,14 +423,18 @@ fn conflicting_votes_lead_to_dispute_participation() {
// This confirms that the second vote doesn't lead to participation again. // This confirms that the second vote doesn't lead to participation again.
assert!(virtual_overseer.try_recv().await.is_none()); assert!(virtual_overseer.try_recv().await.is_none());
test_state
})); }));
} }
#[test] #[test]
fn positive_votes_dont_trigger_participation() { fn positive_votes_dont_trigger_participation() {
test_harness(|test_state, mut virtual_overseer| Box::pin(async move { test_harness(|mut test_state, mut virtual_overseer| Box::pin(async move {
let session = 1; let session = 1;
test_state.handle_resume_sync(&mut virtual_overseer, session).await;
let candidate_receipt = CandidateReceipt::default(); let candidate_receipt = CandidateReceipt::default();
let candidate_hash = candidate_receipt.hash(); let candidate_hash = candidate_receipt.hash();
@@ -477,14 +530,18 @@ fn positive_votes_dont_trigger_participation() {
// This confirms that no participation request is made. // This confirms that no participation request is made.
assert!(virtual_overseer.try_recv().await.is_none()); assert!(virtual_overseer.try_recv().await.is_none());
test_state
})); }));
} }
#[test] #[test]
fn wrong_validator_index_is_ignored() { fn wrong_validator_index_is_ignored() {
test_harness(|test_state, mut virtual_overseer| Box::pin(async move { test_harness(|mut test_state, mut virtual_overseer| Box::pin(async move {
let session = 1; let session = 1;
test_state.handle_resume_sync(&mut virtual_overseer, session).await;
let candidate_receipt = CandidateReceipt::default(); let candidate_receipt = CandidateReceipt::default();
let candidate_hash = candidate_receipt.hash(); let candidate_hash = candidate_receipt.hash();
@@ -547,14 +604,18 @@ fn wrong_validator_index_is_ignored() {
// This confirms that no participation request is made. // This confirms that no participation request is made.
assert!(virtual_overseer.try_recv().await.is_none()); assert!(virtual_overseer.try_recv().await.is_none());
test_state
})); }));
} }
#[test] #[test]
fn finality_votes_ignore_disputed_candidates() { fn finality_votes_ignore_disputed_candidates() {
test_harness(|test_state, mut virtual_overseer| Box::pin(async move { test_harness(|mut test_state, mut virtual_overseer| Box::pin(async move {
let session = 1; let session = 1;
test_state.handle_resume_sync(&mut virtual_overseer, session).await;
let candidate_receipt = CandidateReceipt::default(); let candidate_receipt = CandidateReceipt::default();
let candidate_hash = candidate_receipt.hash(); let candidate_hash = candidate_receipt.hash();
@@ -639,14 +700,18 @@ fn finality_votes_ignore_disputed_candidates() {
virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
assert!(virtual_overseer.try_recv().await.is_none()); assert!(virtual_overseer.try_recv().await.is_none());
test_state
})); }));
} }
#[test] #[test]
fn supermajority_valid_dispute_may_be_finalized() { fn supermajority_valid_dispute_may_be_finalized() {
test_harness(|test_state, mut virtual_overseer| Box::pin(async move { test_harness(|mut test_state, mut virtual_overseer| Box::pin(async move {
let session = 1; let session = 1;
test_state.handle_resume_sync(&mut virtual_overseer, session).await;
let candidate_receipt = CandidateReceipt::default(); let candidate_receipt = CandidateReceipt::default();
let candidate_hash = candidate_receipt.hash(); let candidate_hash = candidate_receipt.hash();
@@ -688,7 +753,23 @@ fn supermajority_valid_dispute_may_be_finalized() {
}, },
}).await; }).await;
let _ = virtual_overseer.recv().await; assert_matches!(
virtual_overseer.recv().await,
AllMessages::DisputeParticipation(
DisputeParticipationMessage::Participate {
candidate_hash: c_hash,
candidate_receipt: c_receipt,
session: s,
report_availability,
..
}
) => {
assert_eq!(candidate_hash, c_hash);
assert_eq!(candidate_receipt, c_receipt);
assert_eq!(session, s);
report_availability.send(true).unwrap();
}
);
let mut statements = Vec::new(); let mut statements = Vec::new();
for i in (0..supermajority_threshold - 1).map(|i| i + 2) { for i in (0..supermajority_threshold - 1).map(|i| i + 2) {
@@ -748,14 +829,18 @@ fn supermajority_valid_dispute_may_be_finalized() {
virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
assert!(virtual_overseer.try_recv().await.is_none()); assert!(virtual_overseer.try_recv().await.is_none());
test_state
})); }));
} }
#[test] #[test]
fn concluded_supermajority_for_non_active_after_time() { fn concluded_supermajority_for_non_active_after_time() {
test_harness(|test_state, mut virtual_overseer| Box::pin(async move { test_harness(|mut test_state, mut virtual_overseer| Box::pin(async move {
let session = 1; let session = 1;
test_state.handle_resume_sync(&mut virtual_overseer, session).await;
let candidate_receipt = CandidateReceipt::default(); let candidate_receipt = CandidateReceipt::default();
let candidate_hash = candidate_receipt.hash(); let candidate_hash = candidate_receipt.hash();
@@ -854,14 +939,18 @@ fn concluded_supermajority_for_non_active_after_time() {
virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
assert!(virtual_overseer.try_recv().await.is_none()); assert!(virtual_overseer.try_recv().await.is_none());
test_state
})); }));
} }
#[test] #[test]
fn concluded_supermajority_against_non_active_after_time() { fn concluded_supermajority_against_non_active_after_time() {
test_harness(|test_state, mut virtual_overseer| Box::pin(async move { test_harness(|mut test_state, mut virtual_overseer| Box::pin(async move {
let session = 1; let session = 1;
test_state.handle_resume_sync(&mut virtual_overseer, session).await;
let candidate_receipt = CandidateReceipt::default(); let candidate_receipt = CandidateReceipt::default();
let candidate_hash = candidate_receipt.hash(); let candidate_hash = candidate_receipt.hash();
@@ -960,14 +1049,18 @@ fn concluded_supermajority_against_non_active_after_time() {
virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
assert!(virtual_overseer.try_recv().await.is_none()); assert!(virtual_overseer.try_recv().await.is_none());
test_state
})); }));
} }
#[test] #[test]
fn fresh_dispute_ignored_if_unavailable() { fn fresh_dispute_ignored_if_unavailable() {
test_harness(|test_state, mut virtual_overseer| Box::pin(async move { test_harness(|mut test_state, mut virtual_overseer| Box::pin(async move {
let session = 1; let session = 1;
test_state.handle_resume_sync(&mut virtual_overseer, session).await;
let candidate_receipt = CandidateReceipt::default(); let candidate_receipt = CandidateReceipt::default();
let candidate_hash = candidate_receipt.hash(); let candidate_hash = candidate_receipt.hash();
@@ -1037,5 +1130,451 @@ fn fresh_dispute_ignored_if_unavailable() {
virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
assert!(virtual_overseer.try_recv().await.is_none()); assert!(virtual_overseer.try_recv().await.is_none());
test_state
}));
}
#[test]
fn resume_dispute_without_local_statement() {
let session = 1;
test_harness(|mut test_state, mut virtual_overseer| Box::pin(async move {
test_state.handle_resume_sync(&mut virtual_overseer, session).await;
let candidate_receipt = CandidateReceipt::default();
let candidate_hash = candidate_receipt.hash();
test_state.activate_leaf_at_session(
&mut virtual_overseer,
session,
1,
).await;
let valid_vote = test_state.issue_statement_with_index(
1,
candidate_hash,
session,
true,
).await;
let invalid_vote = test_state.issue_statement_with_index(
2,
candidate_hash,
session,
false,
).await;
let (pending_confirmation, confirmation_rx) = oneshot::channel();
virtual_overseer.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
candidate_hash,
candidate_receipt: candidate_receipt.clone(),
session,
statements: vec![
(valid_vote, ValidatorIndex(1)),
(invalid_vote, ValidatorIndex(2)),
],
pending_confirmation,
},
}).await;
assert_matches!(
virtual_overseer.recv().await,
AllMessages::DisputeParticipation(
DisputeParticipationMessage::Participate {
report_availability,
..
}
) => {
report_availability.send(true).unwrap();
}
);
assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport));
{
let (tx, rx) = oneshot::channel();
virtual_overseer.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::ActiveDisputes(tx),
}).await;
assert_eq!(rx.await.unwrap().len(), 1);
}
virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
assert!(virtual_overseer.try_recv().await.is_none());
test_state
}))
// Alice should send a DisputeParticiationMessage::Participate on restart since she has no
// local statement for the active dispute.
.resume(|test_state, mut virtual_overseer| Box::pin(async move {
test_state.handle_resume_sync(&mut virtual_overseer, session).await;
let candidate_receipt = CandidateReceipt::default();
let candidate_hash = candidate_receipt.hash();
assert_matches!(
virtual_overseer.recv().await,
AllMessages::DisputeParticipation(
DisputeParticipationMessage::Participate {
candidate_hash: c_hash,
candidate_receipt: c_receipt,
session: s,
report_availability,
..
}
) => {
assert_eq!(candidate_hash, c_hash);
assert_eq!(candidate_receipt, c_receipt);
assert_eq!(session, s);
report_availability.send(true).unwrap();
}
);
let valid_vote0 = test_state.issue_statement_with_index(
0,
candidate_hash,
session,
true,
).await;
let valid_vote3 = test_state.issue_statement_with_index(
3,
candidate_hash,
session,
true,
).await;
let valid_vote4 = test_state.issue_statement_with_index(
4,
candidate_hash,
session,
true,
).await;
let valid_vote5 = test_state.issue_statement_with_index(
5,
candidate_hash,
session,
true,
).await;
let (pending_confirmation, _confirmation_rx) = oneshot::channel();
virtual_overseer.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
candidate_hash,
candidate_receipt: candidate_receipt.clone(),
session,
statements: vec![
(valid_vote0, ValidatorIndex(0)),
(valid_vote3, ValidatorIndex(3)),
(valid_vote4, ValidatorIndex(4)),
(valid_vote5, ValidatorIndex(5)),
],
pending_confirmation,
},
}).await;
// Advance the clock far enough so that the concluded dispute will be omitted from an
// ActiveDisputes query.
test_state.clock.set(test_state.clock.now() + ACTIVE_DURATION_SECS + 1 );
{
let (tx, rx) = oneshot::channel();
virtual_overseer.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::ActiveDisputes(tx),
}).await;
assert!(rx.await.unwrap().is_empty());
}
virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
assert!(virtual_overseer.try_recv().await.is_none());
test_state
}));
}
#[test]
fn resume_dispute_with_local_statement() {
let session = 1;
test_harness(|mut test_state, mut virtual_overseer| Box::pin(async move {
test_state.handle_resume_sync(&mut virtual_overseer, session).await;
let candidate_receipt = CandidateReceipt::default();
let candidate_hash = candidate_receipt.hash();
test_state.activate_leaf_at_session(
&mut virtual_overseer,
session,
1,
).await;
let local_valid_vote = test_state.issue_statement_with_index(
0,
candidate_hash,
session,
true,
).await;
let valid_vote = test_state.issue_statement_with_index(
1,
candidate_hash,
session,
true,
).await;
let invalid_vote = test_state.issue_statement_with_index(
2,
candidate_hash,
session,
false,
).await;
let (pending_confirmation, confirmation_rx) = oneshot::channel();
virtual_overseer.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
candidate_hash,
candidate_receipt: candidate_receipt.clone(),
session,
statements: vec![
(local_valid_vote, ValidatorIndex(0)),
(valid_vote, ValidatorIndex(1)),
(invalid_vote, ValidatorIndex(2)),
],
pending_confirmation,
},
}).await;
assert_matches!(
virtual_overseer.recv().await,
AllMessages::DisputeParticipation(
DisputeParticipationMessage::Participate {
report_availability,
..
}
) => {
report_availability.send(true).unwrap();
}
);
assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport));
{
let (tx, rx) = oneshot::channel();
virtual_overseer.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::ActiveDisputes(tx),
}).await;
assert_eq!(rx.await.unwrap().len(), 1);
}
virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
assert!(virtual_overseer.try_recv().await.is_none());
test_state
}))
// Alice should send a DisputeParticiationMessage::Participate on restart since she has no
// local statement for the active dispute.
.resume(|test_state, mut virtual_overseer| Box::pin(async move {
test_state.handle_resume_sync(&mut virtual_overseer, session).await;
// Assert that subsystem is not sending Participation messages because we issued a local statement
assert!(virtual_overseer.recv().timeout(TEST_TIMEOUT).await.is_none());
virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
assert!(virtual_overseer.try_recv().await.is_none());
test_state
}));
}
#[test]
fn resume_dispute_without_local_statement_or_local_key() {
let session = 1;
let mut test_state = TestState::default();
test_state.subsystem_keystore = make_keystore(&[Sr25519Keyring::Two]).into();
test_state.resume(|mut test_state, mut virtual_overseer| Box::pin(async move {
test_state.handle_resume_sync(&mut virtual_overseer, session).await;
let candidate_receipt = CandidateReceipt::default();
let candidate_hash = candidate_receipt.hash();
test_state.activate_leaf_at_session(
&mut virtual_overseer,
session,
1,
).await;
let valid_vote = test_state.issue_statement_with_index(
1,
candidate_hash,
session,
true,
).await;
let invalid_vote = test_state.issue_statement_with_index(
2,
candidate_hash,
session,
false,
).await;
let (pending_confirmation, confirmation_rx) = oneshot::channel();
virtual_overseer.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
candidate_hash,
candidate_receipt: candidate_receipt.clone(),
session,
statements: vec![
(valid_vote, ValidatorIndex(1)),
(invalid_vote, ValidatorIndex(2)),
],
pending_confirmation,
},
}).await;
assert_matches!(
virtual_overseer.recv().await,
AllMessages::DisputeParticipation(
DisputeParticipationMessage::Participate {
report_availability,
..
}
) => {
report_availability.send(true).unwrap();
}
);
assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport));
{
let (tx, rx) = oneshot::channel();
virtual_overseer.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::ActiveDisputes(tx),
}).await;
assert_eq!(rx.await.unwrap().len(), 1);
}
virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
assert!(virtual_overseer.try_recv().await.is_none());
test_state
}))
// Alice should send a DisputeParticiationMessage::Participate on restart since she has no
// local statement for the active dispute.
.resume(|test_state, mut virtual_overseer| Box::pin(async move {
test_state.handle_resume_sync(&mut virtual_overseer, session).await;
// Assert that subsystem is not sending Participation messages because we issued a local statement
assert!(virtual_overseer.recv().timeout(TEST_TIMEOUT).await.is_none());
virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
assert!(virtual_overseer.try_recv().await.is_none());
test_state
}));
}
#[test]
fn resume_dispute_with_local_statement_without_local_key() {
let session = 1;
let mut test_state = TestState::default();
test_state.subsystem_keystore = make_keystore(&[Sr25519Keyring::Two]).into();
test_state.resume(|mut test_state, mut virtual_overseer| Box::pin(async move {
test_state.handle_resume_sync(&mut virtual_overseer, session).await;
let candidate_receipt = CandidateReceipt::default();
let candidate_hash = candidate_receipt.hash();
test_state.activate_leaf_at_session(
&mut virtual_overseer,
session,
1,
).await;
let local_valid_vote = test_state.issue_statement_with_index(
0,
candidate_hash,
session,
true,
).await;
let valid_vote = test_state.issue_statement_with_index(
1,
candidate_hash,
session,
true,
).await;
let invalid_vote = test_state.issue_statement_with_index(
2,
candidate_hash,
session,
false,
).await;
let (pending_confirmation, confirmation_rx) = oneshot::channel();
virtual_overseer.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
candidate_hash,
candidate_receipt: candidate_receipt.clone(),
session,
statements: vec![
(local_valid_vote, ValidatorIndex(0)),
(valid_vote, ValidatorIndex(1)),
(invalid_vote, ValidatorIndex(2)),
],
pending_confirmation,
},
}).await;
assert_matches!(
virtual_overseer.recv().await,
AllMessages::DisputeParticipation(
DisputeParticipationMessage::Participate {
report_availability,
..
}
) => {
report_availability.send(true).unwrap();
}
);
assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport));
{
let (tx, rx) = oneshot::channel();
virtual_overseer.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::ActiveDisputes(tx),
}).await;
assert_eq!(rx.await.unwrap().len(), 1);
}
virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
assert!(virtual_overseer.try_recv().await.is_none());
test_state
}))
// Alice should send a DisputeParticiationMessage::Participate on restart since she has no
// local statement for the active dispute.
.resume(|test_state, mut virtual_overseer| Box::pin(async move {
test_state.handle_resume_sync(&mut virtual_overseer, session).await;
// Assert that subsystem is not sending Participation messages because we issued a local statement
assert!(virtual_overseer.recv().timeout(TEST_TIMEOUT).await.is_none());
virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
assert!(virtual_overseer.try_recv().await.is_none());
test_state
})); }));
} }