mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-30 05:51:02 +00:00
Availability store pruning (#1820)
* Initial commit * Move tests to separate module * Move timestamps to the newtype * Change idx name * Use Duration for consts and update chunk records * Ordering::Equal * Update node/core/av-store/src/lib.rs Co-authored-by: Peter Goodspeed-Niklaus <coriolinus@users.noreply.github.com> * put_ methods do the array sorting * Fix get_block_number * Change StoreChunk message type and relay parent method * Add chunk tests * Fix block number computation for StoreChunk * Duration instead of u64 everywhere * Add a clarifying comment Co-authored-by: Peter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>
This commit is contained in:
Generated
+3
@@ -4932,7 +4932,9 @@ version = "0.1.0"
|
|||||||
dependencies = [
|
dependencies = [
|
||||||
"assert_matches",
|
"assert_matches",
|
||||||
"derive_more 0.99.11",
|
"derive_more 0.99.11",
|
||||||
|
"env_logger",
|
||||||
"futures 0.3.5",
|
"futures 0.3.5",
|
||||||
|
"futures-timer 3.0.2",
|
||||||
"kvdb",
|
"kvdb",
|
||||||
"kvdb-memorydb",
|
"kvdb-memorydb",
|
||||||
"kvdb-rocksdb",
|
"kvdb-rocksdb",
|
||||||
@@ -4944,6 +4946,7 @@ dependencies = [
|
|||||||
"polkadot-node-subsystem-util",
|
"polkadot-node-subsystem-util",
|
||||||
"polkadot-overseer",
|
"polkadot-overseer",
|
||||||
"polkadot-primitives",
|
"polkadot-primitives",
|
||||||
|
"smallvec 1.4.2",
|
||||||
"sp-core",
|
"sp-core",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|||||||
@@ -7,21 +7,24 @@ edition = "2018"
|
|||||||
[dependencies]
|
[dependencies]
|
||||||
derive_more = "0.99.9"
|
derive_more = "0.99.9"
|
||||||
futures = "0.3.5"
|
futures = "0.3.5"
|
||||||
log = "0.4.8"
|
futures-timer = "3.0.2"
|
||||||
|
|
||||||
kvdb = "0.7.0"
|
kvdb = "0.7.0"
|
||||||
kvdb-rocksdb = "0.9.1"
|
kvdb-rocksdb = "0.9.1"
|
||||||
codec = { package = "parity-scale-codec", version = "1.3.1", features = ["derive"] }
|
log = "0.4.8"
|
||||||
|
|
||||||
|
codec = { package = "parity-scale-codec", version = "1.3.1", features = ["derive"] }
|
||||||
erasure = { package = "polkadot-erasure-coding", path = "../../../erasure-coding" }
|
erasure = { package = "polkadot-erasure-coding", path = "../../../erasure-coding" }
|
||||||
polkadot-overseer = { path = "../../overseer" }
|
|
||||||
polkadot-primitives = { path = "../../../primitives" }
|
|
||||||
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
|
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
|
||||||
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
|
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
|
||||||
|
polkadot-overseer = { path = "../../overseer" }
|
||||||
|
polkadot-primitives = { path = "../../../primitives" }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
env_logger = "0.7.1"
|
||||||
futures = { version = "0.3.5", features = ["thread-pool"] }
|
|
||||||
polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
|
|
||||||
kvdb-memorydb = "0.7.0"
|
|
||||||
assert_matches = "1.3.0"
|
assert_matches = "1.3.0"
|
||||||
|
smallvec = "1.4.2"
|
||||||
|
kvdb-memorydb = "0.7.0"
|
||||||
|
|
||||||
|
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||||
|
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
|
||||||
|
polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,828 @@
|
|||||||
|
// Copyright 2020 Parity Technologies (UK) Ltd.
|
||||||
|
// This file is part of Polkadot.
|
||||||
|
|
||||||
|
// Polkadot is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// Polkadot is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU General Public License
|
||||||
|
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
use assert_matches::assert_matches;
|
||||||
|
use futures::{
|
||||||
|
future,
|
||||||
|
channel::oneshot,
|
||||||
|
executor,
|
||||||
|
Future,
|
||||||
|
};
|
||||||
|
use smallvec::smallvec;
|
||||||
|
|
||||||
|
use polkadot_primitives::v1::{
|
||||||
|
AvailableData, BlockData, CandidateDescriptor, CandidateReceipt, HeadData,
|
||||||
|
PersistedValidationData, PoV, Id as ParaId,
|
||||||
|
};
|
||||||
|
use polkadot_node_subsystem_util::TimeoutExt;
|
||||||
|
use polkadot_subsystem::ActiveLeavesUpdate;
|
||||||
|
use polkadot_node_subsystem_test_helpers as test_helpers;
|
||||||
|
|
||||||
|
struct TestHarness {
|
||||||
|
virtual_overseer: test_helpers::TestSubsystemContextHandle<AvailabilityStoreMessage>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
struct TestCandidateBuilder {
|
||||||
|
para_id: ParaId,
|
||||||
|
pov_hash: Hash,
|
||||||
|
relay_parent: Hash,
|
||||||
|
commitments_hash: Hash,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TestCandidateBuilder {
|
||||||
|
fn build(self) -> CandidateReceipt {
|
||||||
|
CandidateReceipt {
|
||||||
|
descriptor: CandidateDescriptor {
|
||||||
|
para_id: self.para_id,
|
||||||
|
pov_hash: self.pov_hash,
|
||||||
|
relay_parent: self.relay_parent,
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
commitments_hash: self.commitments_hash,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct TestState {
|
||||||
|
persisted_validation_data: PersistedValidationData,
|
||||||
|
pruning_config: PruningConfig,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for TestState {
|
||||||
|
fn default() -> Self {
|
||||||
|
let persisted_validation_data = PersistedValidationData {
|
||||||
|
parent_head: HeadData(vec![7, 8, 9]),
|
||||||
|
block_number: 5,
|
||||||
|
hrmp_mqc_heads: Vec::new(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let pruning_config = PruningConfig {
|
||||||
|
keep_stored_block_for: Duration::from_secs(1),
|
||||||
|
keep_finalized_block_for: Duration::from_secs(2),
|
||||||
|
keep_finalized_chunk_for: Duration::from_secs(2),
|
||||||
|
};
|
||||||
|
|
||||||
|
Self {
|
||||||
|
persisted_validation_data,
|
||||||
|
pruning_config,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn test_harness<T: Future<Output=()>>(
|
||||||
|
pruning_config: PruningConfig,
|
||||||
|
store: Arc<dyn KeyValueDB>,
|
||||||
|
test: impl FnOnce(TestHarness) -> T,
|
||||||
|
) {
|
||||||
|
let _ = env_logger::builder()
|
||||||
|
.is_test(true)
|
||||||
|
.filter(
|
||||||
|
Some("polkadot_node_core_av_store"),
|
||||||
|
log::LevelFilter::Trace,
|
||||||
|
)
|
||||||
|
.filter(
|
||||||
|
Some(LOG_TARGET),
|
||||||
|
log::LevelFilter::Trace,
|
||||||
|
)
|
||||||
|
.try_init();
|
||||||
|
|
||||||
|
let pool = sp_core::testing::TaskExecutor::new();
|
||||||
|
let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone());
|
||||||
|
|
||||||
|
let subsystem = AvailabilityStoreSubsystem::new_in_memory(store, pruning_config);
|
||||||
|
let subsystem = run(subsystem, context);
|
||||||
|
|
||||||
|
let test_fut = test(TestHarness {
|
||||||
|
virtual_overseer,
|
||||||
|
});
|
||||||
|
|
||||||
|
futures::pin_mut!(test_fut);
|
||||||
|
futures::pin_mut!(subsystem);
|
||||||
|
|
||||||
|
executor::block_on(future::select(test_fut, subsystem));
|
||||||
|
}
|
||||||
|
|
||||||
|
const TIMEOUT: Duration = Duration::from_millis(100);
|
||||||
|
|
||||||
|
async fn overseer_send(
|
||||||
|
overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityStoreMessage>,
|
||||||
|
msg: AvailabilityStoreMessage,
|
||||||
|
) {
|
||||||
|
log::trace!("Sending message:\n{:?}", &msg);
|
||||||
|
overseer
|
||||||
|
.send(FromOverseer::Communication { msg })
|
||||||
|
.timeout(TIMEOUT)
|
||||||
|
.await
|
||||||
|
.expect(&format!("{:?} is more than enough for sending messages.", TIMEOUT));
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn overseer_recv(
|
||||||
|
overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityStoreMessage>,
|
||||||
|
) -> AllMessages {
|
||||||
|
let msg = overseer_recv_with_timeout(overseer, TIMEOUT)
|
||||||
|
.await
|
||||||
|
.expect(&format!("{:?} is more than enough to receive messages", TIMEOUT));
|
||||||
|
|
||||||
|
log::trace!("Received message:\n{:?}", &msg);
|
||||||
|
|
||||||
|
msg
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn overseer_recv_with_timeout(
|
||||||
|
overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityStoreMessage>,
|
||||||
|
timeout: Duration,
|
||||||
|
) -> Option<AllMessages> {
|
||||||
|
log::trace!("Waiting for message...");
|
||||||
|
overseer
|
||||||
|
.recv()
|
||||||
|
.timeout(timeout)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn overseer_signal(
|
||||||
|
overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityStoreMessage>,
|
||||||
|
signal: OverseerSignal,
|
||||||
|
) {
|
||||||
|
overseer
|
||||||
|
.send(FromOverseer::Signal(signal))
|
||||||
|
.timeout(TIMEOUT)
|
||||||
|
.await
|
||||||
|
.expect(&format!("{:?} is more than enough for sending signals.", TIMEOUT));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn store_chunk_works() {
|
||||||
|
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
|
||||||
|
test_harness(PruningConfig::default(), store.clone(), |test_harness| async move {
|
||||||
|
let TestHarness { mut virtual_overseer } = test_harness;
|
||||||
|
let relay_parent = Hash::repeat_byte(32);
|
||||||
|
let candidate_hash = Hash::repeat_byte(33);
|
||||||
|
let validator_index = 5;
|
||||||
|
|
||||||
|
let chunk = ErasureChunk {
|
||||||
|
chunk: vec![1, 2, 3],
|
||||||
|
index: validator_index,
|
||||||
|
proof: vec![vec![3, 4, 5]],
|
||||||
|
};
|
||||||
|
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
|
||||||
|
let chunk_msg = AvailabilityStoreMessage::StoreChunk {
|
||||||
|
candidate_hash,
|
||||||
|
relay_parent,
|
||||||
|
validator_index,
|
||||||
|
chunk: chunk.clone(),
|
||||||
|
tx,
|
||||||
|
};
|
||||||
|
|
||||||
|
overseer_send(&mut virtual_overseer, chunk_msg.into()).await;
|
||||||
|
|
||||||
|
assert_matches!(
|
||||||
|
overseer_recv(&mut virtual_overseer).await,
|
||||||
|
AllMessages::ChainApi(ChainApiMessage::BlockNumber(
|
||||||
|
hash,
|
||||||
|
tx,
|
||||||
|
)) => {
|
||||||
|
assert_eq!(hash, relay_parent);
|
||||||
|
tx.send(Ok(Some(4))).unwrap();
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
assert_eq!(rx.await.unwrap(), Ok(()));
|
||||||
|
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
let query_chunk = AvailabilityStoreMessage::QueryChunk(
|
||||||
|
candidate_hash,
|
||||||
|
validator_index,
|
||||||
|
tx,
|
||||||
|
);
|
||||||
|
|
||||||
|
overseer_send(&mut virtual_overseer, query_chunk.into()).await;
|
||||||
|
|
||||||
|
assert_eq!(rx.await.unwrap().unwrap(), chunk);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn store_block_works() {
|
||||||
|
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
|
||||||
|
let test_state = TestState::default();
|
||||||
|
test_harness(test_state.pruning_config.clone(), store.clone(), |test_harness| async move {
|
||||||
|
let TestHarness { mut virtual_overseer } = test_harness;
|
||||||
|
let candidate_hash = Hash::from([1; 32]);
|
||||||
|
let validator_index = 5;
|
||||||
|
let n_validators = 10;
|
||||||
|
|
||||||
|
let pov = PoV {
|
||||||
|
block_data: BlockData(vec![4, 5, 6]),
|
||||||
|
};
|
||||||
|
|
||||||
|
let available_data = AvailableData {
|
||||||
|
pov,
|
||||||
|
validation_data: test_state.persisted_validation_data,
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
let block_msg = AvailabilityStoreMessage::StoreAvailableData(
|
||||||
|
candidate_hash,
|
||||||
|
Some(validator_index),
|
||||||
|
n_validators,
|
||||||
|
available_data.clone(),
|
||||||
|
tx,
|
||||||
|
);
|
||||||
|
|
||||||
|
virtual_overseer.send(FromOverseer::Communication{ msg: block_msg }).await;
|
||||||
|
assert_eq!(rx.await.unwrap(), Ok(()));
|
||||||
|
|
||||||
|
let pov = query_available_data(&mut virtual_overseer, candidate_hash).await.unwrap();
|
||||||
|
assert_eq!(pov, available_data);
|
||||||
|
|
||||||
|
let chunk = query_chunk(&mut virtual_overseer, candidate_hash, validator_index).await.unwrap();
|
||||||
|
|
||||||
|
let chunks = erasure::obtain_chunks_v1(10, &available_data).unwrap();
|
||||||
|
|
||||||
|
let mut branches = erasure::branches(chunks.as_ref());
|
||||||
|
|
||||||
|
let branch = branches.nth(5).unwrap();
|
||||||
|
let expected_chunk = ErasureChunk {
|
||||||
|
chunk: branch.1.to_vec(),
|
||||||
|
index: 5,
|
||||||
|
proof: branch.0,
|
||||||
|
};
|
||||||
|
|
||||||
|
assert_eq!(chunk, expected_chunk);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn store_pov_and_query_chunk_works() {
|
||||||
|
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
|
||||||
|
let test_state = TestState::default();
|
||||||
|
|
||||||
|
test_harness(test_state.pruning_config.clone(), store.clone(), |test_harness| async move {
|
||||||
|
let TestHarness { mut virtual_overseer } = test_harness;
|
||||||
|
let candidate_hash = Hash::from([1; 32]);
|
||||||
|
let n_validators = 10;
|
||||||
|
|
||||||
|
let pov = PoV {
|
||||||
|
block_data: BlockData(vec![4, 5, 6]),
|
||||||
|
};
|
||||||
|
|
||||||
|
let available_data = AvailableData {
|
||||||
|
pov,
|
||||||
|
validation_data: test_state.persisted_validation_data,
|
||||||
|
};
|
||||||
|
|
||||||
|
let no_metrics = Metrics(None);
|
||||||
|
let chunks_expected = get_chunks(&available_data, n_validators as usize, &no_metrics).unwrap();
|
||||||
|
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
let block_msg = AvailabilityStoreMessage::StoreAvailableData(
|
||||||
|
candidate_hash,
|
||||||
|
None,
|
||||||
|
n_validators,
|
||||||
|
available_data,
|
||||||
|
tx,
|
||||||
|
);
|
||||||
|
|
||||||
|
virtual_overseer.send(FromOverseer::Communication{ msg: block_msg }).await;
|
||||||
|
|
||||||
|
assert_eq!(rx.await.unwrap(), Ok(()));
|
||||||
|
|
||||||
|
for validator_index in 0..n_validators {
|
||||||
|
let chunk = query_chunk(&mut virtual_overseer, candidate_hash, validator_index).await.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(chunk, chunks_expected[validator_index as usize]);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn stored_but_not_included_chunk_is_pruned() {
|
||||||
|
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
|
||||||
|
let test_state = TestState::default();
|
||||||
|
|
||||||
|
test_harness(test_state.pruning_config.clone(), store.clone(), |test_harness| async move {
|
||||||
|
let TestHarness { mut virtual_overseer } = test_harness;
|
||||||
|
let candidate_hash = Hash::repeat_byte(1);
|
||||||
|
let relay_parent = Hash::repeat_byte(2);
|
||||||
|
let validator_index = 5;
|
||||||
|
|
||||||
|
let chunk = ErasureChunk {
|
||||||
|
chunk: vec![1, 2, 3],
|
||||||
|
index: validator_index,
|
||||||
|
proof: vec![vec![3, 4, 5]],
|
||||||
|
};
|
||||||
|
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
let chunk_msg = AvailabilityStoreMessage::StoreChunk {
|
||||||
|
candidate_hash,
|
||||||
|
relay_parent,
|
||||||
|
validator_index,
|
||||||
|
chunk: chunk.clone(),
|
||||||
|
tx,
|
||||||
|
};
|
||||||
|
|
||||||
|
overseer_send(&mut virtual_overseer, chunk_msg.into()).await;
|
||||||
|
|
||||||
|
assert_matches!(
|
||||||
|
overseer_recv(&mut virtual_overseer).await,
|
||||||
|
AllMessages::ChainApi(ChainApiMessage::BlockNumber(
|
||||||
|
hash,
|
||||||
|
tx,
|
||||||
|
)) => {
|
||||||
|
assert_eq!(hash, relay_parent);
|
||||||
|
tx.send(Ok(Some(4))).unwrap();
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
rx.await.unwrap().unwrap();
|
||||||
|
|
||||||
|
// At this point data should be in the store.
|
||||||
|
assert_eq!(
|
||||||
|
query_chunk(&mut virtual_overseer, candidate_hash, validator_index).await.unwrap(),
|
||||||
|
chunk,
|
||||||
|
);
|
||||||
|
|
||||||
|
// Wait for twice as long as the stored block kept for.
|
||||||
|
Delay::new(test_state.pruning_config.keep_stored_block_for * 2).await;
|
||||||
|
|
||||||
|
// The block was not included by this point so it should be pruned now.
|
||||||
|
assert!(query_chunk(&mut virtual_overseer, candidate_hash, validator_index).await.is_none());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn stored_but_not_included_data_is_pruned() {
|
||||||
|
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
|
||||||
|
let test_state = TestState::default();
|
||||||
|
|
||||||
|
test_harness(test_state.pruning_config.clone(), store.clone(), |test_harness| async move {
|
||||||
|
let TestHarness { mut virtual_overseer } = test_harness;
|
||||||
|
let candidate_hash = Hash::repeat_byte(1);
|
||||||
|
let n_validators = 10;
|
||||||
|
|
||||||
|
let pov = PoV {
|
||||||
|
block_data: BlockData(vec![4, 5, 6]),
|
||||||
|
};
|
||||||
|
|
||||||
|
let available_data = AvailableData {
|
||||||
|
pov,
|
||||||
|
validation_data: test_state.persisted_validation_data,
|
||||||
|
};
|
||||||
|
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
let block_msg = AvailabilityStoreMessage::StoreAvailableData(
|
||||||
|
candidate_hash,
|
||||||
|
None,
|
||||||
|
n_validators,
|
||||||
|
available_data.clone(),
|
||||||
|
tx,
|
||||||
|
);
|
||||||
|
|
||||||
|
virtual_overseer.send(FromOverseer::Communication{ msg: block_msg }).await;
|
||||||
|
|
||||||
|
rx.await.unwrap().unwrap();
|
||||||
|
|
||||||
|
// At this point data should be in the store.
|
||||||
|
assert_eq!(
|
||||||
|
query_available_data(&mut virtual_overseer, candidate_hash).await.unwrap(),
|
||||||
|
available_data,
|
||||||
|
);
|
||||||
|
|
||||||
|
// Wait for twice as long as the stored block kept for.
|
||||||
|
Delay::new(test_state.pruning_config.keep_stored_block_for * 2).await;
|
||||||
|
|
||||||
|
// The block was not included by this point so it should be pruned now.
|
||||||
|
assert!(query_available_data(&mut virtual_overseer, candidate_hash).await.is_none());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn stored_data_kept_until_finalized() {
|
||||||
|
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
|
||||||
|
let test_state = TestState::default();
|
||||||
|
|
||||||
|
test_harness(test_state.pruning_config.clone(), store.clone(), |test_harness| async move {
|
||||||
|
let TestHarness { mut virtual_overseer } = test_harness;
|
||||||
|
let n_validators = 10;
|
||||||
|
|
||||||
|
let pov = PoV {
|
||||||
|
block_data: BlockData(vec![4, 5, 6]),
|
||||||
|
};
|
||||||
|
|
||||||
|
let pov_hash = pov.hash();
|
||||||
|
|
||||||
|
let candidate = TestCandidateBuilder {
|
||||||
|
pov_hash,
|
||||||
|
..Default::default()
|
||||||
|
}.build();
|
||||||
|
|
||||||
|
let candidate_hash = candidate.hash();
|
||||||
|
|
||||||
|
let available_data = AvailableData {
|
||||||
|
pov,
|
||||||
|
validation_data: test_state.persisted_validation_data,
|
||||||
|
};
|
||||||
|
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
let block_msg = AvailabilityStoreMessage::StoreAvailableData(
|
||||||
|
candidate_hash,
|
||||||
|
None,
|
||||||
|
n_validators,
|
||||||
|
available_data.clone(),
|
||||||
|
tx,
|
||||||
|
);
|
||||||
|
|
||||||
|
virtual_overseer.send(FromOverseer::Communication{ msg: block_msg }).await;
|
||||||
|
|
||||||
|
rx.await.unwrap().unwrap();
|
||||||
|
|
||||||
|
// At this point data should be in the store.
|
||||||
|
assert_eq!(
|
||||||
|
query_available_data(&mut virtual_overseer, candidate_hash).await.unwrap(),
|
||||||
|
available_data,
|
||||||
|
);
|
||||||
|
|
||||||
|
let new_leaf = Hash::repeat_byte(2);
|
||||||
|
overseer_signal(
|
||||||
|
&mut virtual_overseer,
|
||||||
|
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
||||||
|
activated: smallvec![new_leaf.clone()],
|
||||||
|
deactivated: smallvec![],
|
||||||
|
}),
|
||||||
|
).await;
|
||||||
|
|
||||||
|
assert_matches!(
|
||||||
|
overseer_recv(&mut virtual_overseer).await,
|
||||||
|
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||||
|
relay_parent,
|
||||||
|
RuntimeApiRequest::CandidateEvents(tx),
|
||||||
|
)) => {
|
||||||
|
assert_eq!(relay_parent, new_leaf);
|
||||||
|
tx.send(Ok(vec![
|
||||||
|
CandidateEvent::CandidateIncluded(candidate, HeadData::default()),
|
||||||
|
])).unwrap();
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
Delay::new(test_state.pruning_config.keep_stored_block_for * 10).await;
|
||||||
|
|
||||||
|
// At this point data should _still_ be in the store.
|
||||||
|
assert_eq!(
|
||||||
|
query_available_data(&mut virtual_overseer, candidate_hash).await.unwrap(),
|
||||||
|
available_data,
|
||||||
|
);
|
||||||
|
|
||||||
|
overseer_signal(
|
||||||
|
&mut virtual_overseer,
|
||||||
|
OverseerSignal::BlockFinalized(new_leaf)
|
||||||
|
).await;
|
||||||
|
|
||||||
|
assert_matches!(
|
||||||
|
overseer_recv(&mut virtual_overseer).await,
|
||||||
|
AllMessages::ChainApi(ChainApiMessage::BlockNumber(
|
||||||
|
hash,
|
||||||
|
tx,
|
||||||
|
)) => {
|
||||||
|
assert_eq!(hash, new_leaf);
|
||||||
|
tx.send(Ok(Some(10))).unwrap();
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
// Wait for a half of the time finalized data should be available for
|
||||||
|
Delay::new(test_state.pruning_config.keep_finalized_block_for / 2).await;
|
||||||
|
|
||||||
|
// At this point data should _still_ be in the store.
|
||||||
|
assert_eq!(
|
||||||
|
query_available_data(&mut virtual_overseer, candidate_hash).await.unwrap(),
|
||||||
|
available_data,
|
||||||
|
);
|
||||||
|
|
||||||
|
// Wait until it is should be gone.
|
||||||
|
Delay::new(test_state.pruning_config.keep_finalized_block_for).await;
|
||||||
|
|
||||||
|
// At this point data should be gone from the store.
|
||||||
|
assert!(
|
||||||
|
query_available_data(&mut virtual_overseer, candidate_hash).await.is_none(),
|
||||||
|
);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn stored_chunk_kept_until_finalized() {
|
||||||
|
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
|
||||||
|
let test_state = TestState::default();
|
||||||
|
|
||||||
|
test_harness(test_state.pruning_config.clone(), store.clone(), |test_harness| async move {
|
||||||
|
let TestHarness { mut virtual_overseer } = test_harness;
|
||||||
|
let relay_parent = Hash::repeat_byte(2);
|
||||||
|
let validator_index = 5;
|
||||||
|
let candidate = TestCandidateBuilder {
|
||||||
|
..Default::default()
|
||||||
|
}.build();
|
||||||
|
let candidate_hash = candidate.hash();
|
||||||
|
|
||||||
|
let chunk = ErasureChunk {
|
||||||
|
chunk: vec![1, 2, 3],
|
||||||
|
index: validator_index,
|
||||||
|
proof: vec![vec![3, 4, 5]],
|
||||||
|
};
|
||||||
|
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
let chunk_msg = AvailabilityStoreMessage::StoreChunk {
|
||||||
|
candidate_hash,
|
||||||
|
relay_parent,
|
||||||
|
validator_index,
|
||||||
|
chunk: chunk.clone(),
|
||||||
|
tx,
|
||||||
|
};
|
||||||
|
|
||||||
|
overseer_send(&mut virtual_overseer, chunk_msg.into()).await;
|
||||||
|
|
||||||
|
assert_matches!(
|
||||||
|
overseer_recv(&mut virtual_overseer).await,
|
||||||
|
AllMessages::ChainApi(ChainApiMessage::BlockNumber(
|
||||||
|
hash,
|
||||||
|
tx,
|
||||||
|
)) => {
|
||||||
|
assert_eq!(hash, relay_parent);
|
||||||
|
tx.send(Ok(Some(4))).unwrap();
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
rx.await.unwrap().unwrap();
|
||||||
|
|
||||||
|
// At this point data should be in the store.
|
||||||
|
assert_eq!(
|
||||||
|
query_chunk(&mut virtual_overseer, candidate_hash, validator_index).await.unwrap(),
|
||||||
|
chunk,
|
||||||
|
);
|
||||||
|
|
||||||
|
let new_leaf = Hash::repeat_byte(2);
|
||||||
|
overseer_signal(
|
||||||
|
&mut virtual_overseer,
|
||||||
|
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
||||||
|
activated: smallvec![new_leaf.clone()],
|
||||||
|
deactivated: smallvec![],
|
||||||
|
}),
|
||||||
|
).await;
|
||||||
|
|
||||||
|
assert_matches!(
|
||||||
|
overseer_recv(&mut virtual_overseer).await,
|
||||||
|
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||||
|
relay_parent,
|
||||||
|
RuntimeApiRequest::CandidateEvents(tx),
|
||||||
|
)) => {
|
||||||
|
assert_eq!(relay_parent, new_leaf);
|
||||||
|
tx.send(Ok(vec![
|
||||||
|
CandidateEvent::CandidateIncluded(candidate, HeadData::default()),
|
||||||
|
])).unwrap();
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
Delay::new(test_state.pruning_config.keep_stored_block_for * 10).await;
|
||||||
|
|
||||||
|
// At this point data should _still_ be in the store.
|
||||||
|
assert_eq!(
|
||||||
|
query_chunk(&mut virtual_overseer, candidate_hash, validator_index).await.unwrap(),
|
||||||
|
chunk,
|
||||||
|
);
|
||||||
|
|
||||||
|
overseer_signal(
|
||||||
|
&mut virtual_overseer,
|
||||||
|
OverseerSignal::BlockFinalized(new_leaf)
|
||||||
|
).await;
|
||||||
|
|
||||||
|
assert_matches!(
|
||||||
|
overseer_recv(&mut virtual_overseer).await,
|
||||||
|
AllMessages::ChainApi(ChainApiMessage::BlockNumber(
|
||||||
|
hash,
|
||||||
|
tx,
|
||||||
|
)) => {
|
||||||
|
assert_eq!(hash, new_leaf);
|
||||||
|
tx.send(Ok(Some(10))).unwrap();
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
// Wait for a half of the time finalized data should be available for
|
||||||
|
Delay::new(test_state.pruning_config.keep_finalized_block_for / 2).await;
|
||||||
|
|
||||||
|
// At this point data should _still_ be in the store.
|
||||||
|
assert_eq!(
|
||||||
|
query_chunk(&mut virtual_overseer, candidate_hash, validator_index).await.unwrap(),
|
||||||
|
chunk,
|
||||||
|
);
|
||||||
|
|
||||||
|
// Wait until it is should be gone.
|
||||||
|
Delay::new(test_state.pruning_config.keep_finalized_chunk_for).await;
|
||||||
|
|
||||||
|
// At this point data should be gone from the store.
|
||||||
|
assert!(
|
||||||
|
query_available_data(&mut virtual_overseer, candidate_hash).await.is_none(),
|
||||||
|
);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn forkfullness_works() {
|
||||||
|
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
|
||||||
|
let test_state = TestState::default();
|
||||||
|
|
||||||
|
test_harness(test_state.pruning_config.clone(), store.clone(), |test_harness| async move {
|
||||||
|
let TestHarness { mut virtual_overseer } = test_harness;
|
||||||
|
let n_validators = 10;
|
||||||
|
|
||||||
|
let pov_1 = PoV {
|
||||||
|
block_data: BlockData(vec![1, 2, 3]),
|
||||||
|
};
|
||||||
|
|
||||||
|
let pov_1_hash = pov_1.hash();
|
||||||
|
|
||||||
|
let pov_2 = PoV {
|
||||||
|
block_data: BlockData(vec![4, 5, 6]),
|
||||||
|
};
|
||||||
|
|
||||||
|
let pov_2_hash = pov_2.hash();
|
||||||
|
|
||||||
|
let candidate_1 = TestCandidateBuilder {
|
||||||
|
pov_hash: pov_1_hash,
|
||||||
|
..Default::default()
|
||||||
|
}.build();
|
||||||
|
|
||||||
|
let candidate_1_hash = candidate_1.hash();
|
||||||
|
|
||||||
|
let candidate_2 = TestCandidateBuilder {
|
||||||
|
pov_hash: pov_2_hash,
|
||||||
|
..Default::default()
|
||||||
|
}.build();
|
||||||
|
|
||||||
|
let candidate_2_hash = candidate_2.hash();
|
||||||
|
|
||||||
|
let available_data_1 = AvailableData {
|
||||||
|
pov: pov_1,
|
||||||
|
validation_data: test_state.persisted_validation_data.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let available_data_2 = AvailableData {
|
||||||
|
pov: pov_2,
|
||||||
|
validation_data: test_state.persisted_validation_data,
|
||||||
|
};
|
||||||
|
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
let msg = AvailabilityStoreMessage::StoreAvailableData(
|
||||||
|
candidate_1_hash,
|
||||||
|
None,
|
||||||
|
n_validators,
|
||||||
|
available_data_1.clone(),
|
||||||
|
tx,
|
||||||
|
);
|
||||||
|
|
||||||
|
virtual_overseer.send(FromOverseer::Communication{ msg }).await;
|
||||||
|
|
||||||
|
rx.await.unwrap().unwrap();
|
||||||
|
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
let msg = AvailabilityStoreMessage::StoreAvailableData(
|
||||||
|
candidate_2_hash,
|
||||||
|
None,
|
||||||
|
n_validators,
|
||||||
|
available_data_2.clone(),
|
||||||
|
tx,
|
||||||
|
);
|
||||||
|
|
||||||
|
virtual_overseer.send(FromOverseer::Communication{ msg }).await;
|
||||||
|
|
||||||
|
rx.await.unwrap().unwrap();
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
query_available_data(&mut virtual_overseer, candidate_1_hash).await.unwrap(),
|
||||||
|
available_data_1,
|
||||||
|
);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
query_available_data(&mut virtual_overseer, candidate_2_hash).await.unwrap(),
|
||||||
|
available_data_2,
|
||||||
|
);
|
||||||
|
|
||||||
|
|
||||||
|
let new_leaf_1 = Hash::repeat_byte(2);
|
||||||
|
let new_leaf_2 = Hash::repeat_byte(3);
|
||||||
|
|
||||||
|
overseer_signal(
|
||||||
|
&mut virtual_overseer,
|
||||||
|
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
||||||
|
activated: smallvec![new_leaf_1.clone(), new_leaf_2.clone()],
|
||||||
|
deactivated: smallvec![],
|
||||||
|
}),
|
||||||
|
).await;
|
||||||
|
|
||||||
|
assert_matches!(
|
||||||
|
overseer_recv(&mut virtual_overseer).await,
|
||||||
|
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||||
|
leaf,
|
||||||
|
RuntimeApiRequest::CandidateEvents(tx),
|
||||||
|
)) => {
|
||||||
|
assert_eq!(leaf, new_leaf_1);
|
||||||
|
tx.send(Ok(vec![
|
||||||
|
CandidateEvent::CandidateIncluded(candidate_1, HeadData::default()),
|
||||||
|
])).unwrap();
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
assert_matches!(
|
||||||
|
overseer_recv(&mut virtual_overseer).await,
|
||||||
|
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||||
|
leaf,
|
||||||
|
RuntimeApiRequest::CandidateEvents(tx),
|
||||||
|
)) => {
|
||||||
|
assert_eq!(leaf, new_leaf_2);
|
||||||
|
tx.send(Ok(vec![
|
||||||
|
CandidateEvent::CandidateIncluded(candidate_2, HeadData::default()),
|
||||||
|
])).unwrap();
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
overseer_signal(
|
||||||
|
&mut virtual_overseer,
|
||||||
|
OverseerSignal::BlockFinalized(new_leaf_1)
|
||||||
|
).await;
|
||||||
|
|
||||||
|
assert_matches!(
|
||||||
|
overseer_recv(&mut virtual_overseer).await,
|
||||||
|
AllMessages::ChainApi(ChainApiMessage::BlockNumber(
|
||||||
|
hash,
|
||||||
|
tx,
|
||||||
|
)) => {
|
||||||
|
assert_eq!(hash, new_leaf_1);
|
||||||
|
tx.send(Ok(Some(5))).unwrap();
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
|
||||||
|
// Data of both candidates should be still present in the DB.
|
||||||
|
assert_eq!(
|
||||||
|
query_available_data(&mut virtual_overseer, candidate_1_hash).await.unwrap(),
|
||||||
|
available_data_1,
|
||||||
|
);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
query_available_data(&mut virtual_overseer, candidate_2_hash).await.unwrap(),
|
||||||
|
available_data_2,
|
||||||
|
);
|
||||||
|
// Wait for longer than finalized blocks should be kept for
|
||||||
|
Delay::new(test_state.pruning_config.keep_finalized_block_for + Duration::from_secs(1)).await;
|
||||||
|
|
||||||
|
// Data of both candidates should be gone now.
|
||||||
|
assert!(
|
||||||
|
query_available_data(&mut virtual_overseer, candidate_1_hash).await.is_none(),
|
||||||
|
);
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
query_available_data(&mut virtual_overseer, candidate_2_hash).await.is_none(),
|
||||||
|
);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn query_available_data(
|
||||||
|
virtual_overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityStoreMessage>,
|
||||||
|
candidate_hash: Hash,
|
||||||
|
) -> Option<AvailableData> {
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
|
||||||
|
let query = AvailabilityStoreMessage::QueryAvailableData(candidate_hash, tx);
|
||||||
|
virtual_overseer.send(FromOverseer::Communication{ msg: query }).await;
|
||||||
|
|
||||||
|
rx.await.unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn query_chunk(
|
||||||
|
virtual_overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityStoreMessage>,
|
||||||
|
candidate_hash: Hash,
|
||||||
|
index: u32,
|
||||||
|
) -> Option<ErasureChunk> {
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
|
||||||
|
let query = AvailabilityStoreMessage::QueryChunk(candidate_hash, index, tx);
|
||||||
|
virtual_overseer.send(FromOverseer::Communication{ msg: query }).await;
|
||||||
|
|
||||||
|
rx.await.unwrap()
|
||||||
|
}
|
||||||
@@ -661,6 +661,7 @@ where
|
|||||||
if let Err(_e) = store_chunk(
|
if let Err(_e) = store_chunk(
|
||||||
ctx,
|
ctx,
|
||||||
message.candidate_hash.clone(),
|
message.candidate_hash.clone(),
|
||||||
|
live_candidate.descriptor.relay_parent.clone(),
|
||||||
message.erasure_chunk.index,
|
message.erasure_chunk.index,
|
||||||
message.erasure_chunk.clone(),
|
message.erasure_chunk.clone(),
|
||||||
).await? {
|
).await? {
|
||||||
@@ -949,6 +950,7 @@ where
|
|||||||
async fn store_chunk<Context>(
|
async fn store_chunk<Context>(
|
||||||
ctx: &mut Context,
|
ctx: &mut Context,
|
||||||
candidate_hash: Hash,
|
candidate_hash: Hash,
|
||||||
|
relay_parent: Hash,
|
||||||
validator_index: ValidatorIndex,
|
validator_index: ValidatorIndex,
|
||||||
erasure_chunk: ErasureChunk,
|
erasure_chunk: ErasureChunk,
|
||||||
) -> Result<std::result::Result<(), ()>>
|
) -> Result<std::result::Result<(), ()>>
|
||||||
@@ -957,7 +959,13 @@ where
|
|||||||
{
|
{
|
||||||
let (tx, rx) = oneshot::channel();
|
let (tx, rx) = oneshot::channel();
|
||||||
ctx.send_message(AllMessages::AvailabilityStore(
|
ctx.send_message(AllMessages::AvailabilityStore(
|
||||||
AvailabilityStoreMessage::StoreChunk(candidate_hash, validator_index, erasure_chunk, tx),
|
AvailabilityStoreMessage::StoreChunk {
|
||||||
|
candidate_hash,
|
||||||
|
relay_parent,
|
||||||
|
validator_index,
|
||||||
|
chunk: erasure_chunk,
|
||||||
|
tx,
|
||||||
|
}
|
||||||
)).await?;
|
)).await?;
|
||||||
rx.await.map_err::<Error, _>(Into::into)
|
rx.await.map_err::<Error, _>(Into::into)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -305,7 +305,18 @@ pub enum AvailabilityStoreMessage {
|
|||||||
/// Store an `ErasureChunk` in the AV store.
|
/// Store an `ErasureChunk` in the AV store.
|
||||||
///
|
///
|
||||||
/// Return `Ok(())` if the store operation succeeded, `Err(())` if it failed.
|
/// Return `Ok(())` if the store operation succeeded, `Err(())` if it failed.
|
||||||
StoreChunk(Hash, ValidatorIndex, ErasureChunk, oneshot::Sender<Result<(), ()>>),
|
StoreChunk {
|
||||||
|
/// A hash of the candidate this chunk belongs to.
|
||||||
|
candidate_hash: Hash,
|
||||||
|
/// A relevant relay parent.
|
||||||
|
relay_parent: Hash,
|
||||||
|
/// The index of the validator this chunk belongs to.
|
||||||
|
validator_index: ValidatorIndex,
|
||||||
|
/// The chunk itself.
|
||||||
|
chunk: ErasureChunk,
|
||||||
|
/// Sending side of the channel to send result to.
|
||||||
|
tx: oneshot::Sender<Result<(), ()>>,
|
||||||
|
},
|
||||||
|
|
||||||
/// Store a `AvailableData` in the AV store.
|
/// Store a `AvailableData` in the AV store.
|
||||||
/// If `ValidatorIndex` is present store corresponding chunk also.
|
/// If `ValidatorIndex` is present store corresponding chunk also.
|
||||||
@@ -315,15 +326,10 @@ pub enum AvailabilityStoreMessage {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl AvailabilityStoreMessage {
|
impl AvailabilityStoreMessage {
|
||||||
/// If the current variant contains the relay parent hash, return it.
|
/// In fact, none of the AvailabilityStore messages assume a particular relay parent.
|
||||||
pub fn relay_parent(&self) -> Option<Hash> {
|
pub fn relay_parent(&self) -> Option<Hash> {
|
||||||
match self {
|
match self {
|
||||||
Self::QueryAvailableData(hash, _) => Some(*hash),
|
_ => None,
|
||||||
Self::QueryDataAvailability(hash, _) => Some(*hash),
|
|
||||||
Self::QueryChunk(hash, _, _) => Some(*hash),
|
|
||||||
Self::QueryChunkAvailability(hash, _, _) => Some(*hash),
|
|
||||||
Self::StoreChunk(hash, _, _, _) => Some(*hash),
|
|
||||||
Self::StoreAvailableData(hash, _, _, _, _) => Some(*hash),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user