mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-14 04:01:10 +00:00
alternate availability store schema (#2237)
* alternate availability store schema * improvements * tweaks * new DB schema and skeleton * expand skeleton and tweaks * handle backing and inclusion * let finality be handled later * handle finalized blocks * implement query methods * implement chunk storing * StoreAvailableData * fix an off-by-one * implement pruning * reinstate subsystem trait impl * reinstate metrics * fix warnings * remove chunks_cache * oops * actually store the available data * mockable pruning interval * fix tests * spacing * fix code grumbles * guide improvements * make time mockable * implement a mocked clock for testing * return DB errors * Update node/core/av-store/Cargo.toml Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com> * Update roadmap/implementers-guide/src/node/utility/availability-store.md Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com> * Update roadmap/implementers-guide/src/node/utility/availability-store.md Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com> * review grumbles & clarity * fix review grumbles * Add docs Co-authored-by: Andronik Ordian <write@reusable.software> Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com> Co-authored-by: Andronik Ordian <write@reusable.software>
This commit is contained in:
committed by
GitHub
parent
f9ce261707
commit
aedf089691
+980
-1003
File diff suppressed because it is too large
Load Diff
@@ -26,13 +26,15 @@ use futures::{
|
||||
|
||||
use polkadot_primitives::v1::{
|
||||
AvailableData, BlockData, CandidateDescriptor, CandidateReceipt, HeadData,
|
||||
PersistedValidationData, PoV, Id as ParaId, CandidateHash,
|
||||
PersistedValidationData, PoV, Id as ParaId, CandidateHash, Header, ValidatorId,
|
||||
};
|
||||
use polkadot_node_subsystem_util::TimeoutExt;
|
||||
use polkadot_subsystem::{
|
||||
ActiveLeavesUpdate, errors::RuntimeApiError, JaegerSpan,
|
||||
ActiveLeavesUpdate, errors::RuntimeApiError, JaegerSpan, messages::AllMessages,
|
||||
};
|
||||
use polkadot_node_subsystem_test_helpers as test_helpers;
|
||||
use sp_keyring::Sr25519Keyring;
|
||||
use parking_lot::Mutex;
|
||||
|
||||
struct TestHarness {
|
||||
virtual_overseer: test_helpers::TestSubsystemContextHandle<AvailabilityStoreMessage>,
|
||||
@@ -60,9 +62,41 @@ impl TestCandidateBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct TestClock {
|
||||
inner: Arc<Mutex<Duration>>,
|
||||
}
|
||||
|
||||
impl TestClock {
|
||||
fn now(&self) -> Duration {
|
||||
self.inner.lock().clone()
|
||||
}
|
||||
|
||||
fn inc(&self, by: Duration) {
|
||||
*self.inner.lock() += by;
|
||||
}
|
||||
}
|
||||
|
||||
impl Clock for TestClock {
|
||||
fn now(&self) -> Result<Duration, Error> {
|
||||
Ok(TestClock::now(self))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[derive(Clone)]
|
||||
struct TestState {
|
||||
persisted_validation_data: PersistedValidationData,
|
||||
pruning_config: PruningConfig,
|
||||
clock: TestClock,
|
||||
}
|
||||
|
||||
impl TestState {
|
||||
// pruning is only polled periodically, so we sometimes need to delay until
|
||||
// we're sure the subsystem has done pruning.
|
||||
async fn wait_for_pruning(&self) {
|
||||
Delay::new(self.pruning_config.pruning_interval * 2).await
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for TestState {
|
||||
@@ -77,20 +111,26 @@ impl Default for TestState {
|
||||
};
|
||||
|
||||
let pruning_config = PruningConfig {
|
||||
keep_stored_block_for: Duration::from_secs(1),
|
||||
keep_finalized_block_for: Duration::from_secs(2),
|
||||
keep_finalized_chunk_for: Duration::from_secs(2),
|
||||
keep_unavailable_for: Duration::from_secs(1),
|
||||
keep_finalized_for: Duration::from_secs(2),
|
||||
pruning_interval: Duration::from_millis(250),
|
||||
};
|
||||
|
||||
let clock = TestClock {
|
||||
inner: Arc::new(Mutex::new(Duration::from_secs(0))),
|
||||
};
|
||||
|
||||
Self {
|
||||
persisted_validation_data,
|
||||
pruning_config,
|
||||
clock,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
fn test_harness<T: Future<Output=()>>(
|
||||
pruning_config: PruningConfig,
|
||||
state: TestState,
|
||||
store: Arc<dyn KeyValueDB>,
|
||||
test: impl FnOnce(TestHarness) -> T,
|
||||
) {
|
||||
@@ -109,7 +149,12 @@ fn test_harness<T: Future<Output=()>>(
|
||||
let pool = sp_core::testing::TaskExecutor::new();
|
||||
let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone());
|
||||
|
||||
let subsystem = AvailabilityStoreSubsystem::new_in_memory(store, pruning_config);
|
||||
let subsystem = AvailabilityStoreSubsystem::new_in_memory(
|
||||
store,
|
||||
state.pruning_config.clone(),
|
||||
Box::new(state.clock),
|
||||
);
|
||||
|
||||
let subsystem = run(subsystem, context);
|
||||
|
||||
let test_fut = test(TestHarness {
|
||||
@@ -170,11 +215,17 @@ async fn overseer_signal(
|
||||
.expect(&format!("{:?} is more than enough for sending signals.", TIMEOUT));
|
||||
}
|
||||
|
||||
fn with_tx(db: &Arc<impl KeyValueDB>, f: impl FnOnce(&mut DBTransaction)) {
|
||||
let mut tx = DBTransaction::new();
|
||||
f(&mut tx);
|
||||
db.write(tx).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn runtime_api_error_does_not_stop_the_subsystem() {
|
||||
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
|
||||
|
||||
test_harness(PruningConfig::default(), store, |test_harness| async move {
|
||||
test_harness(TestState::default(), store, |test_harness| async move {
|
||||
let TestHarness { mut virtual_overseer } = test_harness;
|
||||
let new_leaf = Hash::repeat_byte(0x01);
|
||||
|
||||
@@ -218,7 +269,59 @@ fn runtime_api_error_does_not_stop_the_subsystem() {
|
||||
#[test]
|
||||
fn store_chunk_works() {
|
||||
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
|
||||
test_harness(PruningConfig::default(), store.clone(), |test_harness| async move {
|
||||
test_harness(TestState::default(), store.clone(), |test_harness| async move {
|
||||
let TestHarness { mut virtual_overseer } = test_harness;
|
||||
let relay_parent = Hash::repeat_byte(32);
|
||||
let candidate_hash = CandidateHash(Hash::repeat_byte(33));
|
||||
let validator_index = 5;
|
||||
let n_validators = 10;
|
||||
|
||||
let chunk = ErasureChunk {
|
||||
chunk: vec![1, 2, 3],
|
||||
index: validator_index,
|
||||
proof: vec![vec![3, 4, 5]],
|
||||
};
|
||||
|
||||
// Ensure an entry already exists. In reality this would come from watching
|
||||
// chain events.
|
||||
with_tx(&store, |tx| {
|
||||
super::write_meta(tx, &candidate_hash, &CandidateMeta {
|
||||
data_available: false,
|
||||
chunks_stored: bitvec::bitvec![BitOrderLsb0, u8; 0; n_validators],
|
||||
state: State::Unavailable(BETimestamp(0)),
|
||||
});
|
||||
});
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
let chunk_msg = AvailabilityStoreMessage::StoreChunk {
|
||||
candidate_hash,
|
||||
relay_parent,
|
||||
chunk: chunk.clone(),
|
||||
tx,
|
||||
};
|
||||
|
||||
overseer_send(&mut virtual_overseer, chunk_msg.into()).await;
|
||||
assert_eq!(rx.await.unwrap(), Ok(()));
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let query_chunk = AvailabilityStoreMessage::QueryChunk(
|
||||
candidate_hash,
|
||||
validator_index,
|
||||
tx,
|
||||
);
|
||||
|
||||
overseer_send(&mut virtual_overseer, query_chunk.into()).await;
|
||||
|
||||
assert_eq!(rx.await.unwrap().unwrap(), chunk);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
#[test]
|
||||
fn store_chunk_does_nothing_if_no_entry_already() {
|
||||
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
|
||||
test_harness(TestState::default(), store.clone(), |test_harness| async move {
|
||||
let TestHarness { mut virtual_overseer } = test_harness;
|
||||
let relay_parent = Hash::repeat_byte(32);
|
||||
let candidate_hash = CandidateHash(Hash::repeat_byte(33));
|
||||
@@ -240,19 +343,7 @@ fn store_chunk_works() {
|
||||
};
|
||||
|
||||
overseer_send(&mut virtual_overseer, chunk_msg.into()).await;
|
||||
|
||||
assert_matches!(
|
||||
overseer_recv(&mut virtual_overseer).await,
|
||||
AllMessages::ChainApi(ChainApiMessage::BlockNumber(
|
||||
hash,
|
||||
tx,
|
||||
)) => {
|
||||
assert_eq!(hash, relay_parent);
|
||||
tx.send(Ok(Some(4))).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
assert_eq!(rx.await.unwrap(), Ok(()));
|
||||
assert_eq!(rx.await.unwrap(), Err(()));
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let query_chunk = AvailabilityStoreMessage::QueryChunk(
|
||||
@@ -263,7 +354,52 @@ fn store_chunk_works() {
|
||||
|
||||
overseer_send(&mut virtual_overseer, query_chunk.into()).await;
|
||||
|
||||
assert_eq!(rx.await.unwrap().unwrap(), chunk);
|
||||
assert!(rx.await.unwrap().is_none());
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn query_chunk_checks_meta() {
|
||||
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
|
||||
test_harness(TestState::default(), store.clone(), |test_harness| async move {
|
||||
let TestHarness { mut virtual_overseer } = test_harness;
|
||||
let candidate_hash = CandidateHash(Hash::repeat_byte(33));
|
||||
let validator_index = 5;
|
||||
let n_validators = 10;
|
||||
|
||||
// Ensure an entry already exists. In reality this would come from watching
|
||||
// chain events.
|
||||
with_tx(&store, |tx| {
|
||||
super::write_meta(tx, &candidate_hash, &CandidateMeta {
|
||||
data_available: false,
|
||||
chunks_stored: {
|
||||
let mut v = bitvec::bitvec![BitOrderLsb0, u8; 0; n_validators];
|
||||
v.set(validator_index as usize, true);
|
||||
v
|
||||
},
|
||||
state: State::Unavailable(BETimestamp(0)),
|
||||
});
|
||||
});
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let query_chunk = AvailabilityStoreMessage::QueryChunkAvailability(
|
||||
candidate_hash,
|
||||
validator_index,
|
||||
tx,
|
||||
);
|
||||
|
||||
overseer_send(&mut virtual_overseer, query_chunk.into()).await;
|
||||
assert!(rx.await.unwrap());
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let query_chunk = AvailabilityStoreMessage::QueryChunkAvailability(
|
||||
candidate_hash,
|
||||
validator_index + 1,
|
||||
tx,
|
||||
);
|
||||
|
||||
overseer_send(&mut virtual_overseer, query_chunk.into()).await;
|
||||
assert!(!rx.await.unwrap());
|
||||
});
|
||||
}
|
||||
|
||||
@@ -271,7 +407,7 @@ fn store_chunk_works() {
|
||||
fn store_block_works() {
|
||||
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
|
||||
let test_state = TestState::default();
|
||||
test_harness(test_state.pruning_config.clone(), store.clone(), |test_harness| async move {
|
||||
test_harness(test_state.clone(), store.clone(), |test_harness| async move {
|
||||
let TestHarness { mut virtual_overseer } = test_harness;
|
||||
let candidate_hash = CandidateHash(Hash::repeat_byte(1));
|
||||
let validator_index = 5;
|
||||
@@ -283,7 +419,7 @@ fn store_block_works() {
|
||||
|
||||
let available_data = AvailableData {
|
||||
pov: Arc::new(pov),
|
||||
validation_data: test_state.persisted_validation_data,
|
||||
validation_data: test_state.persisted_validation_data.clone(),
|
||||
};
|
||||
|
||||
|
||||
@@ -319,13 +455,12 @@ fn store_block_works() {
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
#[test]
|
||||
fn store_pov_and_query_chunk_works() {
|
||||
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
|
||||
let test_state = TestState::default();
|
||||
|
||||
test_harness(test_state.pruning_config.clone(), store.clone(), |test_harness| async move {
|
||||
test_harness(test_state.clone(), store.clone(), |test_harness| async move {
|
||||
let TestHarness { mut virtual_overseer } = test_harness;
|
||||
let candidate_hash = CandidateHash(Hash::repeat_byte(1));
|
||||
let n_validators = 10;
|
||||
@@ -336,11 +471,10 @@ fn store_pov_and_query_chunk_works() {
|
||||
|
||||
let available_data = AvailableData {
|
||||
pov: Arc::new(pov),
|
||||
validation_data: test_state.persisted_validation_data,
|
||||
validation_data: test_state.persisted_validation_data.clone(),
|
||||
};
|
||||
|
||||
let no_metrics = Metrics(None);
|
||||
let chunks_expected = get_chunks(&available_data, n_validators as usize, &no_metrics).unwrap();
|
||||
let chunks_expected = erasure::obtain_chunks_v1(n_validators as _, &available_data).unwrap();
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let block_msg = AvailabilityStoreMessage::StoreAvailableData(
|
||||
@@ -358,71 +492,17 @@ fn store_pov_and_query_chunk_works() {
|
||||
for validator_index in 0..n_validators {
|
||||
let chunk = query_chunk(&mut virtual_overseer, candidate_hash, validator_index).await.unwrap();
|
||||
|
||||
assert_eq!(chunk, chunks_expected[validator_index as usize]);
|
||||
assert_eq!(chunk.chunk, chunks_expected[validator_index as usize]);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stored_but_not_included_chunk_is_pruned() {
|
||||
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
|
||||
let test_state = TestState::default();
|
||||
|
||||
test_harness(test_state.pruning_config.clone(), store.clone(), |test_harness| async move {
|
||||
let TestHarness { mut virtual_overseer } = test_harness;
|
||||
let candidate_hash = CandidateHash(Hash::repeat_byte(1));
|
||||
let relay_parent = Hash::repeat_byte(2);
|
||||
let validator_index = 5;
|
||||
|
||||
let chunk = ErasureChunk {
|
||||
chunk: vec![1, 2, 3],
|
||||
index: validator_index,
|
||||
proof: vec![vec![3, 4, 5]],
|
||||
};
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let chunk_msg = AvailabilityStoreMessage::StoreChunk {
|
||||
candidate_hash,
|
||||
relay_parent,
|
||||
chunk: chunk.clone(),
|
||||
tx,
|
||||
};
|
||||
|
||||
overseer_send(&mut virtual_overseer, chunk_msg.into()).await;
|
||||
|
||||
assert_matches!(
|
||||
overseer_recv(&mut virtual_overseer).await,
|
||||
AllMessages::ChainApi(ChainApiMessage::BlockNumber(
|
||||
hash,
|
||||
tx,
|
||||
)) => {
|
||||
assert_eq!(hash, relay_parent);
|
||||
tx.send(Ok(Some(4))).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
rx.await.unwrap().unwrap();
|
||||
|
||||
// At this point data should be in the store.
|
||||
assert_eq!(
|
||||
query_chunk(&mut virtual_overseer, candidate_hash, validator_index).await.unwrap(),
|
||||
chunk,
|
||||
);
|
||||
|
||||
// Wait for twice as long as the stored block kept for.
|
||||
Delay::new(test_state.pruning_config.keep_stored_block_for * 2).await;
|
||||
|
||||
// The block was not included by this point so it should be pruned now.
|
||||
assert!(query_chunk(&mut virtual_overseer, candidate_hash, validator_index).await.is_none());
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stored_but_not_included_data_is_pruned() {
|
||||
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
|
||||
let test_state = TestState::default();
|
||||
|
||||
test_harness(test_state.pruning_config.clone(), store.clone(), |test_harness| async move {
|
||||
test_harness(test_state.clone(), store.clone(), |test_harness| async move {
|
||||
let TestHarness { mut virtual_overseer } = test_harness;
|
||||
let candidate_hash = CandidateHash(Hash::repeat_byte(1));
|
||||
let n_validators = 10;
|
||||
@@ -433,7 +513,7 @@ fn stored_but_not_included_data_is_pruned() {
|
||||
|
||||
let available_data = AvailableData {
|
||||
pov: Arc::new(pov),
|
||||
validation_data: test_state.persisted_validation_data,
|
||||
validation_data: test_state.persisted_validation_data.clone(),
|
||||
};
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
@@ -455,8 +535,9 @@ fn stored_but_not_included_data_is_pruned() {
|
||||
available_data,
|
||||
);
|
||||
|
||||
// Wait for twice as long as the stored block kept for.
|
||||
Delay::new(test_state.pruning_config.keep_stored_block_for * 2).await;
|
||||
// Wait until pruning.
|
||||
test_state.clock.inc(test_state.pruning_config.keep_unavailable_for);
|
||||
test_state.wait_for_pruning().await;
|
||||
|
||||
// The block was not included by this point so it should be pruned now.
|
||||
assert!(query_available_data(&mut virtual_overseer, candidate_hash).await.is_none());
|
||||
@@ -468,7 +549,7 @@ fn stored_data_kept_until_finalized() {
|
||||
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
|
||||
let test_state = TestState::default();
|
||||
|
||||
test_harness(test_state.pruning_config.clone(), store.clone(), |test_harness| async move {
|
||||
test_harness(test_state.clone(), store.clone(), |test_harness| async move {
|
||||
let TestHarness { mut virtual_overseer } = test_harness;
|
||||
let n_validators = 10;
|
||||
|
||||
@@ -487,9 +568,12 @@ fn stored_data_kept_until_finalized() {
|
||||
|
||||
let available_data = AvailableData {
|
||||
pov: Arc::new(pov),
|
||||
validation_data: test_state.persisted_validation_data,
|
||||
validation_data: test_state.persisted_validation_data.clone(),
|
||||
};
|
||||
|
||||
let parent = Hash::repeat_byte(2);
|
||||
let block_number = 10;
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let block_msg = AvailabilityStoreMessage::StoreAvailableData(
|
||||
candidate_hash,
|
||||
@@ -509,29 +593,17 @@ fn stored_data_kept_until_finalized() {
|
||||
available_data,
|
||||
);
|
||||
|
||||
let new_leaf = Hash::repeat_byte(2);
|
||||
overseer_signal(
|
||||
let new_leaf = import_leaf(
|
||||
&mut virtual_overseer,
|
||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
||||
activated: vec![(new_leaf, Arc::new(JaegerSpan::Disabled))].into(),
|
||||
deactivated: vec![].into(),
|
||||
}),
|
||||
parent,
|
||||
block_number,
|
||||
vec![CandidateEvent::CandidateIncluded(candidate, HeadData::default())],
|
||||
(0..n_validators).map(|_| Sr25519Keyring::Alice.public().into()).collect(),
|
||||
).await;
|
||||
|
||||
assert_matches!(
|
||||
overseer_recv(&mut virtual_overseer).await,
|
||||
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
relay_parent,
|
||||
RuntimeApiRequest::CandidateEvents(tx),
|
||||
)) => {
|
||||
assert_eq!(relay_parent, new_leaf);
|
||||
tx.send(Ok(vec![
|
||||
CandidateEvent::CandidateIncluded(candidate, HeadData::default()),
|
||||
])).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
Delay::new(test_state.pruning_config.keep_stored_block_for * 10).await;
|
||||
// Wait until unavailable data would definitely be pruned.
|
||||
test_state.clock.inc(test_state.pruning_config.keep_unavailable_for * 10);
|
||||
test_state.wait_for_pruning().await;
|
||||
|
||||
// At this point data should _still_ be in the store.
|
||||
assert_eq!(
|
||||
@@ -539,13 +611,18 @@ fn stored_data_kept_until_finalized() {
|
||||
available_data,
|
||||
);
|
||||
|
||||
assert!(
|
||||
query_all_chunks(&mut virtual_overseer, candidate_hash, n_validators, true).await
|
||||
);
|
||||
|
||||
overseer_signal(
|
||||
&mut virtual_overseer,
|
||||
OverseerSignal::BlockFinalized(new_leaf, 10)
|
||||
OverseerSignal::BlockFinalized(new_leaf, block_number)
|
||||
).await;
|
||||
|
||||
// Wait for a half of the time finalized data should be available for
|
||||
Delay::new(test_state.pruning_config.keep_finalized_block_for / 2).await;
|
||||
// Wait until unavailable data would definitely be pruned.
|
||||
test_state.clock.inc(test_state.pruning_config.keep_finalized_for / 2);
|
||||
test_state.wait_for_pruning().await;
|
||||
|
||||
// At this point data should _still_ be in the store.
|
||||
assert_eq!(
|
||||
@@ -553,115 +630,21 @@ fn stored_data_kept_until_finalized() {
|
||||
available_data,
|
||||
);
|
||||
|
||||
// Wait until it is should be gone.
|
||||
Delay::new(test_state.pruning_config.keep_finalized_block_for).await;
|
||||
assert!(
|
||||
query_all_chunks(&mut virtual_overseer, candidate_hash, n_validators, true).await
|
||||
);
|
||||
|
||||
// Wait until it definitely should be gone.
|
||||
test_state.clock.inc(test_state.pruning_config.keep_finalized_for);
|
||||
test_state.wait_for_pruning().await;
|
||||
|
||||
// At this point data should be gone from the store.
|
||||
assert!(
|
||||
query_available_data(&mut virtual_overseer, candidate_hash).await.is_none(),
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stored_chunk_kept_until_finalized() {
|
||||
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
|
||||
let test_state = TestState::default();
|
||||
|
||||
test_harness(test_state.pruning_config.clone(), store.clone(), |test_harness| async move {
|
||||
let TestHarness { mut virtual_overseer } = test_harness;
|
||||
let relay_parent = Hash::repeat_byte(2);
|
||||
let validator_index = 5;
|
||||
let candidate = TestCandidateBuilder {
|
||||
..Default::default()
|
||||
}.build();
|
||||
let candidate_hash = candidate.hash();
|
||||
|
||||
let chunk = ErasureChunk {
|
||||
chunk: vec![1, 2, 3],
|
||||
index: validator_index,
|
||||
proof: vec![vec![3, 4, 5]],
|
||||
};
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let chunk_msg = AvailabilityStoreMessage::StoreChunk {
|
||||
candidate_hash,
|
||||
relay_parent,
|
||||
chunk: chunk.clone(),
|
||||
tx,
|
||||
};
|
||||
|
||||
overseer_send(&mut virtual_overseer, chunk_msg.into()).await;
|
||||
|
||||
assert_matches!(
|
||||
overseer_recv(&mut virtual_overseer).await,
|
||||
AllMessages::ChainApi(ChainApiMessage::BlockNumber(
|
||||
hash,
|
||||
tx,
|
||||
)) => {
|
||||
assert_eq!(hash, relay_parent);
|
||||
tx.send(Ok(Some(4))).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
rx.await.unwrap().unwrap();
|
||||
|
||||
// At this point data should be in the store.
|
||||
assert_eq!(
|
||||
query_chunk(&mut virtual_overseer, candidate_hash, validator_index).await.unwrap(),
|
||||
chunk,
|
||||
);
|
||||
|
||||
let new_leaf = Hash::repeat_byte(2);
|
||||
overseer_signal(
|
||||
&mut virtual_overseer,
|
||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
||||
activated: vec![(new_leaf, Arc::new(JaegerSpan::Disabled))].into(),
|
||||
deactivated: vec![].into(),
|
||||
}),
|
||||
).await;
|
||||
|
||||
assert_matches!(
|
||||
overseer_recv(&mut virtual_overseer).await,
|
||||
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
relay_parent,
|
||||
RuntimeApiRequest::CandidateEvents(tx),
|
||||
)) => {
|
||||
assert_eq!(relay_parent, new_leaf);
|
||||
tx.send(Ok(vec![
|
||||
CandidateEvent::CandidateIncluded(candidate, HeadData::default()),
|
||||
])).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
Delay::new(test_state.pruning_config.keep_stored_block_for * 10).await;
|
||||
|
||||
// At this point data should _still_ be in the store.
|
||||
assert_eq!(
|
||||
query_chunk(&mut virtual_overseer, candidate_hash, validator_index).await.unwrap(),
|
||||
chunk,
|
||||
);
|
||||
|
||||
overseer_signal(
|
||||
&mut virtual_overseer,
|
||||
OverseerSignal::BlockFinalized(new_leaf, 10)
|
||||
).await;
|
||||
|
||||
// Wait for a half of the time finalized data should be available for
|
||||
Delay::new(test_state.pruning_config.keep_finalized_block_for / 2).await;
|
||||
|
||||
// At this point data should _still_ be in the store.
|
||||
assert_eq!(
|
||||
query_chunk(&mut virtual_overseer, candidate_hash, validator_index).await.unwrap(),
|
||||
chunk,
|
||||
);
|
||||
|
||||
// Wait until it is should be gone.
|
||||
Delay::new(test_state.pruning_config.keep_finalized_chunk_for).await;
|
||||
|
||||
// At this point data should be gone from the store.
|
||||
assert!(
|
||||
query_available_data(&mut virtual_overseer, candidate_hash).await.is_none(),
|
||||
query_all_chunks(&mut virtual_overseer, candidate_hash, n_validators, false).await
|
||||
);
|
||||
});
|
||||
}
|
||||
@@ -671,9 +654,14 @@ fn forkfullness_works() {
|
||||
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
|
||||
let test_state = TestState::default();
|
||||
|
||||
test_harness(test_state.pruning_config.clone(), store.clone(), |test_harness| async move {
|
||||
test_harness(test_state.clone(), store.clone(), |test_harness| async move {
|
||||
let TestHarness { mut virtual_overseer } = test_harness;
|
||||
let n_validators = 10;
|
||||
let block_number_1 = 5;
|
||||
let block_number_2 = 5;
|
||||
let validators: Vec<_> = (0..n_validators).map(|_| Sr25519Keyring::Alice.public().into()).collect();
|
||||
let parent_1 = Hash::repeat_byte(3);
|
||||
let parent_2 = Hash::repeat_byte(4);
|
||||
|
||||
let pov_1 = PoV {
|
||||
block_data: BlockData(vec![1, 2, 3]),
|
||||
@@ -708,7 +696,7 @@ fn forkfullness_works() {
|
||||
|
||||
let available_data_2 = AvailableData {
|
||||
pov: Arc::new(pov_2),
|
||||
validation_data: test_state.persisted_validation_data,
|
||||
validation_data: test_state.persisted_validation_data.clone(),
|
||||
};
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
@@ -747,47 +735,25 @@ fn forkfullness_works() {
|
||||
available_data_2,
|
||||
);
|
||||
|
||||
|
||||
let new_leaf_1 = Hash::repeat_byte(2);
|
||||
let new_leaf_2 = Hash::repeat_byte(3);
|
||||
|
||||
overseer_signal(
|
||||
let new_leaf_1 = import_leaf(
|
||||
&mut virtual_overseer,
|
||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
||||
activated: vec![(new_leaf_1, Arc::new(JaegerSpan::Disabled)), (new_leaf_2, Arc::new(JaegerSpan::Disabled))].into(),
|
||||
deactivated: vec![].into(),
|
||||
}),
|
||||
parent_1,
|
||||
block_number_1,
|
||||
vec![CandidateEvent::CandidateIncluded(candidate_1, HeadData::default())],
|
||||
validators.clone(),
|
||||
).await;
|
||||
|
||||
assert_matches!(
|
||||
overseer_recv(&mut virtual_overseer).await,
|
||||
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
leaf,
|
||||
RuntimeApiRequest::CandidateEvents(tx),
|
||||
)) => {
|
||||
assert_eq!(leaf, new_leaf_1);
|
||||
tx.send(Ok(vec![
|
||||
CandidateEvent::CandidateIncluded(candidate_1, HeadData::default()),
|
||||
])).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
assert_matches!(
|
||||
overseer_recv(&mut virtual_overseer).await,
|
||||
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
leaf,
|
||||
RuntimeApiRequest::CandidateEvents(tx),
|
||||
)) => {
|
||||
assert_eq!(leaf, new_leaf_2);
|
||||
tx.send(Ok(vec![
|
||||
CandidateEvent::CandidateIncluded(candidate_2, HeadData::default()),
|
||||
])).unwrap();
|
||||
}
|
||||
);
|
||||
let _new_leaf_2 = import_leaf(
|
||||
&mut virtual_overseer,
|
||||
parent_2,
|
||||
block_number_2,
|
||||
vec![CandidateEvent::CandidateIncluded(candidate_2, HeadData::default())],
|
||||
validators.clone(),
|
||||
).await;
|
||||
|
||||
overseer_signal(
|
||||
&mut virtual_overseer,
|
||||
OverseerSignal::BlockFinalized(new_leaf_1, 5)
|
||||
OverseerSignal::BlockFinalized(new_leaf_1, block_number_1)
|
||||
).await;
|
||||
|
||||
// Data of both candidates should be still present in the DB.
|
||||
@@ -800,10 +766,41 @@ fn forkfullness_works() {
|
||||
query_available_data(&mut virtual_overseer, candidate_2_hash).await.unwrap(),
|
||||
available_data_2,
|
||||
);
|
||||
// Wait for longer than finalized blocks should be kept for
|
||||
Delay::new(test_state.pruning_config.keep_finalized_block_for + Duration::from_secs(1)).await;
|
||||
|
||||
// Data of both candidates should be gone now.
|
||||
assert!(
|
||||
query_all_chunks(&mut virtual_overseer, candidate_1_hash, n_validators, true).await,
|
||||
);
|
||||
|
||||
assert!(
|
||||
query_all_chunks(&mut virtual_overseer, candidate_2_hash, n_validators, true).await,
|
||||
);
|
||||
|
||||
// Candidate 2 should now be considered unavailable and will be pruned.
|
||||
test_state.clock.inc(test_state.pruning_config.keep_unavailable_for);
|
||||
test_state.wait_for_pruning().await;
|
||||
|
||||
assert_eq!(
|
||||
query_available_data(&mut virtual_overseer, candidate_1_hash).await.unwrap(),
|
||||
available_data_1,
|
||||
);
|
||||
|
||||
assert!(
|
||||
query_available_data(&mut virtual_overseer, candidate_2_hash).await.is_none(),
|
||||
);
|
||||
|
||||
assert!(
|
||||
query_all_chunks(&mut virtual_overseer, candidate_1_hash, n_validators, true).await,
|
||||
);
|
||||
|
||||
assert!(
|
||||
query_all_chunks(&mut virtual_overseer, candidate_2_hash, n_validators, false).await,
|
||||
);
|
||||
|
||||
// Wait for longer than finalized blocks should be kept for
|
||||
test_state.clock.inc(test_state.pruning_config.keep_finalized_for);
|
||||
test_state.wait_for_pruning().await;
|
||||
|
||||
// Everything should be pruned now.
|
||||
assert!(
|
||||
query_available_data(&mut virtual_overseer, candidate_1_hash).await.is_none(),
|
||||
);
|
||||
@@ -811,6 +808,14 @@ fn forkfullness_works() {
|
||||
assert!(
|
||||
query_available_data(&mut virtual_overseer, candidate_2_hash).await.is_none(),
|
||||
);
|
||||
|
||||
assert!(
|
||||
query_all_chunks(&mut virtual_overseer, candidate_1_hash, n_validators, false).await,
|
||||
);
|
||||
|
||||
assert!(
|
||||
query_all_chunks(&mut virtual_overseer, candidate_2_hash, n_validators, false).await,
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -838,3 +843,88 @@ async fn query_chunk(
|
||||
|
||||
rx.await.unwrap()
|
||||
}
|
||||
|
||||
async fn query_all_chunks(
|
||||
virtual_overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityStoreMessage>,
|
||||
candidate_hash: CandidateHash,
|
||||
n_validators: u32,
|
||||
expect_present: bool,
|
||||
) -> bool {
|
||||
for i in 0..n_validators {
|
||||
if query_chunk(virtual_overseer, candidate_hash, i).await.is_some() != expect_present {
|
||||
return false
|
||||
}
|
||||
}
|
||||
true
|
||||
}
|
||||
|
||||
async fn import_leaf(
|
||||
virtual_overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityStoreMessage>,
|
||||
parent_hash: Hash,
|
||||
block_number: BlockNumber,
|
||||
events: Vec<CandidateEvent>,
|
||||
validators: Vec<ValidatorId>,
|
||||
) -> Hash {
|
||||
let header = Header {
|
||||
parent_hash,
|
||||
number: block_number,
|
||||
state_root: Hash::zero(),
|
||||
extrinsics_root: Hash::zero(),
|
||||
digest: Default::default(),
|
||||
};
|
||||
let new_leaf = header.hash();
|
||||
|
||||
overseer_signal(
|
||||
virtual_overseer,
|
||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
||||
activated: vec![(new_leaf, Arc::new(JaegerSpan::Disabled))].into(),
|
||||
deactivated: vec![].into(),
|
||||
}),
|
||||
).await;
|
||||
|
||||
assert_matches!(
|
||||
overseer_recv(virtual_overseer).await,
|
||||
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
relay_parent,
|
||||
RuntimeApiRequest::CandidateEvents(tx),
|
||||
)) => {
|
||||
assert_eq!(relay_parent, new_leaf);
|
||||
tx.send(Ok(events)).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
assert_matches!(
|
||||
overseer_recv(virtual_overseer).await,
|
||||
AllMessages::ChainApi(ChainApiMessage::BlockNumber(
|
||||
relay_parent,
|
||||
tx,
|
||||
)) => {
|
||||
assert_eq!(relay_parent, new_leaf);
|
||||
tx.send(Ok(Some(block_number))).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
assert_matches!(
|
||||
overseer_recv(virtual_overseer).await,
|
||||
AllMessages::ChainApi(ChainApiMessage::BlockHeader(
|
||||
relay_parent,
|
||||
tx,
|
||||
)) => {
|
||||
assert_eq!(relay_parent, new_leaf);
|
||||
tx.send(Ok(Some(header))).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
assert_matches!(
|
||||
overseer_recv(virtual_overseer).await,
|
||||
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
relay_parent,
|
||||
RuntimeApiRequest::Validators(tx),
|
||||
)) => {
|
||||
assert_eq!(relay_parent, parent_hash);
|
||||
tx.send(Ok(validators)).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
new_leaf
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user