mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-08 01:58:00 +00:00
Avoid querying the local validator in availability recovery (#2792)
* guide: don't request availability data from ourselves * add QueryAllChunks message * implement QueryAllChunks * remove unused relay_parent from StoreChunk * test QueryAllChunks * fast paths make short roads * test early exit behavior
This commit is contained in:
committed by
GitHub
parent
a960e2ff6d
commit
5da762e728
@@ -980,6 +980,33 @@ fn process_message(
|
||||
let _timer = subsystem.metrics.time_get_chunk();
|
||||
let _ = tx.send(load_chunk(&subsystem.db, &candidate, validator_index)?);
|
||||
}
|
||||
AvailabilityStoreMessage::QueryAllChunks(candidate, tx) => {
|
||||
match load_meta(&subsystem.db, &candidate)? {
|
||||
None => {
|
||||
let _ = tx.send(Vec::new());
|
||||
}
|
||||
Some(meta) => {
|
||||
let mut chunks = Vec::new();
|
||||
|
||||
for (index, _) in meta.chunks_stored.iter().enumerate().filter(|(_, b)| **b) {
|
||||
let _timer = subsystem.metrics.time_get_chunk();
|
||||
match load_chunk(&subsystem.db, &candidate, ValidatorIndex(index as _))? {
|
||||
Some(c) => chunks.push(c),
|
||||
None => {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
?candidate,
|
||||
index,
|
||||
"No chunk found for set bit in meta"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let _ = tx.send(chunks);
|
||||
}
|
||||
}
|
||||
}
|
||||
AvailabilityStoreMessage::QueryChunkAvailability(candidate, validator_index, tx) => {
|
||||
let a = load_meta(&subsystem.db, &candidate)?
|
||||
.map_or(false, |m|
|
||||
@@ -989,7 +1016,6 @@ fn process_message(
|
||||
}
|
||||
AvailabilityStoreMessage::StoreChunk {
|
||||
candidate_hash,
|
||||
relay_parent: _,
|
||||
chunk,
|
||||
tx,
|
||||
} => {
|
||||
|
||||
@@ -284,7 +284,6 @@ fn store_chunk_works() {
|
||||
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
|
||||
test_harness(TestState::default(), store.clone(), |test_harness| async move {
|
||||
let TestHarness { mut virtual_overseer } = test_harness;
|
||||
let relay_parent = Hash::repeat_byte(32);
|
||||
let candidate_hash = CandidateHash(Hash::repeat_byte(33));
|
||||
let validator_index = ValidatorIndex(5);
|
||||
let n_validators = 10;
|
||||
@@ -309,7 +308,6 @@ fn store_chunk_works() {
|
||||
|
||||
let chunk_msg = AvailabilityStoreMessage::StoreChunk {
|
||||
candidate_hash,
|
||||
relay_parent,
|
||||
chunk: chunk.clone(),
|
||||
tx,
|
||||
};
|
||||
@@ -336,7 +334,6 @@ fn store_chunk_does_nothing_if_no_entry_already() {
|
||||
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
|
||||
test_harness(TestState::default(), store.clone(), |test_harness| async move {
|
||||
let TestHarness { mut virtual_overseer } = test_harness;
|
||||
let relay_parent = Hash::repeat_byte(32);
|
||||
let candidate_hash = CandidateHash(Hash::repeat_byte(33));
|
||||
let validator_index = ValidatorIndex(5);
|
||||
|
||||
@@ -350,7 +347,6 @@ fn store_chunk_does_nothing_if_no_entry_already() {
|
||||
|
||||
let chunk_msg = AvailabilityStoreMessage::StoreChunk {
|
||||
candidate_hash,
|
||||
relay_parent,
|
||||
chunk: chunk.clone(),
|
||||
tx,
|
||||
};
|
||||
@@ -510,6 +506,98 @@ fn store_pov_and_query_chunk_works() {
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn query_all_chunks_works() {
|
||||
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
|
||||
let test_state = TestState::default();
|
||||
|
||||
test_harness(test_state.clone(), store.clone(), |test_harness| async move {
|
||||
let TestHarness { mut virtual_overseer } = test_harness;
|
||||
|
||||
// all chunks for hash 1.
|
||||
// 1 chunk for hash 2.
|
||||
// 0 chunks for hash 3.
|
||||
let candidate_hash_1 = CandidateHash(Hash::repeat_byte(1));
|
||||
let candidate_hash_2 = CandidateHash(Hash::repeat_byte(2));
|
||||
let candidate_hash_3 = CandidateHash(Hash::repeat_byte(3));
|
||||
|
||||
let n_validators = 10;
|
||||
|
||||
let pov = PoV {
|
||||
block_data: BlockData(vec![4, 5, 6]),
|
||||
};
|
||||
|
||||
let available_data = AvailableData {
|
||||
pov: Arc::new(pov),
|
||||
validation_data: test_state.persisted_validation_data.clone(),
|
||||
};
|
||||
|
||||
{
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let block_msg = AvailabilityStoreMessage::StoreAvailableData(
|
||||
candidate_hash_1,
|
||||
None,
|
||||
n_validators,
|
||||
available_data,
|
||||
tx,
|
||||
);
|
||||
|
||||
virtual_overseer.send(FromOverseer::Communication { msg: block_msg }).await;
|
||||
assert_eq!(rx.await.unwrap(), Ok(()));
|
||||
}
|
||||
|
||||
{
|
||||
with_tx(&store, |tx| {
|
||||
super::write_meta(tx, &candidate_hash_2, &CandidateMeta {
|
||||
data_available: false,
|
||||
chunks_stored: bitvec::bitvec![BitOrderLsb0, u8; 0; n_validators as _],
|
||||
state: State::Unavailable(BETimestamp(0)),
|
||||
});
|
||||
});
|
||||
|
||||
let chunk = ErasureChunk {
|
||||
chunk: vec![1, 2, 3],
|
||||
index: ValidatorIndex(1),
|
||||
proof: vec![vec![3, 4, 5]],
|
||||
};
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let store_chunk_msg = AvailabilityStoreMessage::StoreChunk {
|
||||
candidate_hash: candidate_hash_2,
|
||||
chunk,
|
||||
tx,
|
||||
};
|
||||
|
||||
virtual_overseer.send(FromOverseer::Communication { msg: store_chunk_msg }).await;
|
||||
assert_eq!(rx.await.unwrap(), Ok(()));
|
||||
}
|
||||
|
||||
{
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
let msg = AvailabilityStoreMessage::QueryAllChunks(candidate_hash_1, tx);
|
||||
virtual_overseer.send(FromOverseer::Communication { msg }).await;
|
||||
assert_eq!(rx.await.unwrap().len(), n_validators as usize);
|
||||
}
|
||||
|
||||
{
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
let msg = AvailabilityStoreMessage::QueryAllChunks(candidate_hash_2, tx);
|
||||
virtual_overseer.send(FromOverseer::Communication { msg }).await;
|
||||
assert_eq!(rx.await.unwrap().len(), 1);
|
||||
}
|
||||
|
||||
{
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
let msg = AvailabilityStoreMessage::QueryAllChunks(candidate_hash_3, tx);
|
||||
virtual_overseer.send(FromOverseer::Communication { msg }).await;
|
||||
assert_eq!(rx.await.unwrap().len(), 0);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stored_but_not_included_data_is_pruned() {
|
||||
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
|
||||
@@ -625,7 +713,7 @@ fn stored_data_kept_until_finalized() {
|
||||
);
|
||||
|
||||
assert!(
|
||||
query_all_chunks(&mut virtual_overseer, candidate_hash, n_validators, true).await
|
||||
has_all_chunks(&mut virtual_overseer, candidate_hash, n_validators, true).await
|
||||
);
|
||||
|
||||
overseer_signal(
|
||||
@@ -644,7 +732,7 @@ fn stored_data_kept_until_finalized() {
|
||||
);
|
||||
|
||||
assert!(
|
||||
query_all_chunks(&mut virtual_overseer, candidate_hash, n_validators, true).await
|
||||
has_all_chunks(&mut virtual_overseer, candidate_hash, n_validators, true).await
|
||||
);
|
||||
|
||||
// Wait until it definitely should be gone.
|
||||
@@ -657,7 +745,7 @@ fn stored_data_kept_until_finalized() {
|
||||
);
|
||||
|
||||
assert!(
|
||||
query_all_chunks(&mut virtual_overseer, candidate_hash, n_validators, false).await
|
||||
has_all_chunks(&mut virtual_overseer, candidate_hash, n_validators, false).await
|
||||
);
|
||||
});
|
||||
}
|
||||
@@ -781,11 +869,11 @@ fn forkfullness_works() {
|
||||
);
|
||||
|
||||
assert!(
|
||||
query_all_chunks(&mut virtual_overseer, candidate_1_hash, n_validators, true).await,
|
||||
has_all_chunks(&mut virtual_overseer, candidate_1_hash, n_validators, true).await,
|
||||
);
|
||||
|
||||
assert!(
|
||||
query_all_chunks(&mut virtual_overseer, candidate_2_hash, n_validators, true).await,
|
||||
has_all_chunks(&mut virtual_overseer, candidate_2_hash, n_validators, true).await,
|
||||
);
|
||||
|
||||
// Candidate 2 should now be considered unavailable and will be pruned.
|
||||
@@ -802,11 +890,11 @@ fn forkfullness_works() {
|
||||
);
|
||||
|
||||
assert!(
|
||||
query_all_chunks(&mut virtual_overseer, candidate_1_hash, n_validators, true).await,
|
||||
has_all_chunks(&mut virtual_overseer, candidate_1_hash, n_validators, true).await,
|
||||
);
|
||||
|
||||
assert!(
|
||||
query_all_chunks(&mut virtual_overseer, candidate_2_hash, n_validators, false).await,
|
||||
has_all_chunks(&mut virtual_overseer, candidate_2_hash, n_validators, false).await,
|
||||
);
|
||||
|
||||
// Wait for longer than finalized blocks should be kept for
|
||||
@@ -823,11 +911,11 @@ fn forkfullness_works() {
|
||||
);
|
||||
|
||||
assert!(
|
||||
query_all_chunks(&mut virtual_overseer, candidate_1_hash, n_validators, false).await,
|
||||
has_all_chunks(&mut virtual_overseer, candidate_1_hash, n_validators, false).await,
|
||||
);
|
||||
|
||||
assert!(
|
||||
query_all_chunks(&mut virtual_overseer, candidate_2_hash, n_validators, false).await,
|
||||
has_all_chunks(&mut virtual_overseer, candidate_2_hash, n_validators, false).await,
|
||||
);
|
||||
});
|
||||
}
|
||||
@@ -857,7 +945,7 @@ async fn query_chunk(
|
||||
rx.await.unwrap()
|
||||
}
|
||||
|
||||
async fn query_all_chunks(
|
||||
async fn has_all_chunks(
|
||||
virtual_overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityStoreMessage>,
|
||||
candidate_hash: CandidateHash,
|
||||
n_validators: u32,
|
||||
|
||||
@@ -27,7 +27,7 @@ use polkadot_node_network_protocol::request_response::{
|
||||
v1::{ChunkFetchingRequest, ChunkFetchingResponse},
|
||||
};
|
||||
use polkadot_primitives::v1::{AuthorityDiscoveryId, BlakeTwo256, CandidateHash, GroupIndex, Hash, HashT, OccupiedCore, SessionIndex};
|
||||
use polkadot_node_primitives::ErasureChunk;
|
||||
use polkadot_node_primitives::ErasureChunk;
|
||||
use polkadot_subsystem::messages::{
|
||||
AllMessages, AvailabilityStoreMessage, NetworkBridgeMessage, IfDisconnected,
|
||||
};
|
||||
@@ -405,7 +405,6 @@ impl RunningTask {
|
||||
.send(FromFetchTask::Message(AllMessages::AvailabilityStore(
|
||||
AvailabilityStoreMessage::StoreChunk {
|
||||
candidate_hash: self.request.candidate_hash,
|
||||
relay_parent: self.relay_parent,
|
||||
chunk,
|
||||
tx,
|
||||
},
|
||||
|
||||
@@ -67,7 +67,7 @@ pub struct TestState {
|
||||
pub relay_chain: Vec<Hash>,
|
||||
/// Whenever the subsystem tries to fetch an erasure chunk one item of the given vec will be
|
||||
/// popped. So you can experiment with serving invalid chunks or no chunks on request and see
|
||||
/// whether the subystem still succeds with its goal.
|
||||
/// whether the subystem still succeds with its goal.
|
||||
pub chunks: HashMap<(CandidateHash, ValidatorIndex), Vec<Option<ErasureChunk>>>,
|
||||
/// All chunks that are valid and should be accepted.
|
||||
pub valid_chunks: HashSet<(CandidateHash, ValidatorIndex)>,
|
||||
|
||||
@@ -334,6 +334,34 @@ impl RequestChunksPhase {
|
||||
params: &InteractionParams,
|
||||
sender: &mut impl SubsystemSender,
|
||||
) -> Result<AvailableData, RecoveryError> {
|
||||
// First query the store for any chunks we've got.
|
||||
{
|
||||
let (tx, rx) = oneshot::channel();
|
||||
sender.send_message(
|
||||
AvailabilityStoreMessage::QueryAllChunks(params.candidate_hash, tx).into()
|
||||
).await;
|
||||
|
||||
match rx.await {
|
||||
Ok(chunks) => {
|
||||
// This should either be length 1 or 0. If we had the whole data,
|
||||
// we wouldn't have reached this stage.
|
||||
let chunk_indices: Vec<_> = chunks.iter().map(|c| c.index).collect();
|
||||
self.shuffling.retain(|i| !chunk_indices.contains(i));
|
||||
|
||||
for chunk in chunks {
|
||||
self.received_chunks.insert(chunk.index, chunk);
|
||||
}
|
||||
}
|
||||
Err(oneshot::Canceled) => {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
candidate_hash = ?params.candidate_hash,
|
||||
"Failed to reach the availability store"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
if self.is_unavailable(¶ms) {
|
||||
tracing::debug!(
|
||||
@@ -431,6 +459,26 @@ fn reconstructed_data_matches_root(
|
||||
|
||||
impl<S: SubsystemSender> Interaction<S> {
|
||||
async fn run(mut self) -> Result<AvailableData, RecoveryError> {
|
||||
// First just see if we have the data available locally.
|
||||
{
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.sender.send_message(
|
||||
AvailabilityStoreMessage::QueryAvailableData(self.params.candidate_hash, tx).into()
|
||||
).await;
|
||||
|
||||
match rx.await {
|
||||
Ok(Some(data)) => return Ok(data),
|
||||
Ok(None) => {}
|
||||
Err(oneshot::Canceled) => {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
candidate_hash = ?self.params.candidate_hash,
|
||||
"Failed to reach the availability store",
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
// These only fail if we cannot reach the underlying subsystem, which case there is nothing
|
||||
// meaningful we can do.
|
||||
|
||||
@@ -207,6 +207,46 @@ impl TestState {
|
||||
);
|
||||
}
|
||||
|
||||
async fn respond_to_available_data_query(
|
||||
&self,
|
||||
virtual_overseer: &mut VirtualOverseer,
|
||||
with_data: bool,
|
||||
) {
|
||||
assert_matches!(
|
||||
overseer_recv(virtual_overseer).await,
|
||||
AllMessages::AvailabilityStore(
|
||||
AvailabilityStoreMessage::QueryAvailableData(_, tx)
|
||||
) => {
|
||||
let _ = tx.send(if with_data {
|
||||
Some(self.available_data.clone())
|
||||
} else {
|
||||
println!("SENDING NONE");
|
||||
None
|
||||
});
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
async fn respond_to_query_all_request(
|
||||
&self,
|
||||
virtual_overseer: &mut VirtualOverseer,
|
||||
send_chunk: impl Fn(usize) -> bool
|
||||
) {
|
||||
assert_matches!(
|
||||
overseer_recv(virtual_overseer).await,
|
||||
AllMessages::AvailabilityStore(
|
||||
AvailabilityStoreMessage::QueryAllChunks(_, tx)
|
||||
) => {
|
||||
let v = self.chunks.iter()
|
||||
.filter(|c| send_chunk(c.index.0 as usize))
|
||||
.cloned()
|
||||
.collect();
|
||||
|
||||
let _ = tx.send(v);
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
async fn test_chunk_requests(
|
||||
&self,
|
||||
candidate_hash: CandidateHash,
|
||||
@@ -430,6 +470,9 @@ fn availability_is_recovered_from_chunks_if_no_group_provided() {
|
||||
|
||||
let candidate_hash = test_state.candidate.hash();
|
||||
|
||||
test_state.respond_to_available_data_query(&mut virtual_overseer, false).await;
|
||||
test_state.respond_to_query_all_request(&mut virtual_overseer, |_| false).await;
|
||||
|
||||
test_state.test_chunk_requests(
|
||||
candidate_hash,
|
||||
&mut virtual_overseer,
|
||||
@@ -459,6 +502,9 @@ fn availability_is_recovered_from_chunks_if_no_group_provided() {
|
||||
|
||||
test_state.test_runtime_api(&mut virtual_overseer).await;
|
||||
|
||||
test_state.respond_to_available_data_query(&mut virtual_overseer, false).await;
|
||||
test_state.respond_to_query_all_request(&mut virtual_overseer, |_| false).await;
|
||||
|
||||
test_state.test_chunk_requests(
|
||||
new_candidate.hash(),
|
||||
&mut virtual_overseer,
|
||||
@@ -506,6 +552,9 @@ fn availability_is_recovered_from_chunks_even_if_backing_group_supplied_if_chunk
|
||||
|
||||
let candidate_hash = test_state.candidate.hash();
|
||||
|
||||
test_state.respond_to_available_data_query(&mut virtual_overseer, false).await;
|
||||
test_state.respond_to_query_all_request(&mut virtual_overseer, |_| false).await;
|
||||
|
||||
test_state.test_chunk_requests(
|
||||
candidate_hash,
|
||||
&mut virtual_overseer,
|
||||
@@ -535,6 +584,9 @@ fn availability_is_recovered_from_chunks_even_if_backing_group_supplied_if_chunk
|
||||
|
||||
test_state.test_runtime_api(&mut virtual_overseer).await;
|
||||
|
||||
test_state.respond_to_available_data_query(&mut virtual_overseer, false).await;
|
||||
test_state.respond_to_query_all_request(&mut virtual_overseer, |_| false).await;
|
||||
|
||||
test_state.test_chunk_requests(
|
||||
new_candidate.hash(),
|
||||
&mut virtual_overseer,
|
||||
@@ -589,6 +641,9 @@ fn bad_merkle_path_leads_to_recovery_error() {
|
||||
test_state.chunks[3].chunk = vec![3; 32];
|
||||
test_state.chunks[4].chunk = vec![4; 32];
|
||||
|
||||
test_state.respond_to_available_data_query(&mut virtual_overseer, false).await;
|
||||
test_state.respond_to_query_all_request(&mut virtual_overseer, |_| false).await;
|
||||
|
||||
test_state.test_chunk_requests(
|
||||
candidate_hash,
|
||||
&mut virtual_overseer,
|
||||
@@ -642,6 +697,9 @@ fn wrong_chunk_index_leads_to_recovery_error() {
|
||||
test_state.chunks[3] = test_state.chunks[0].clone();
|
||||
test_state.chunks[4] = test_state.chunks[0].clone();
|
||||
|
||||
test_state.respond_to_available_data_query(&mut virtual_overseer, false).await;
|
||||
test_state.respond_to_query_all_request(&mut virtual_overseer, |_| false).await;
|
||||
|
||||
test_state.test_chunk_requests(
|
||||
candidate_hash,
|
||||
&mut virtual_overseer,
|
||||
@@ -705,6 +763,9 @@ fn invalid_erasure_coding_leads_to_invalid_error() {
|
||||
|
||||
test_state.test_runtime_api(&mut virtual_overseer).await;
|
||||
|
||||
test_state.respond_to_available_data_query(&mut virtual_overseer, false).await;
|
||||
test_state.respond_to_query_all_request(&mut virtual_overseer, |_| false).await;
|
||||
|
||||
test_state.test_chunk_requests(
|
||||
candidate_hash,
|
||||
&mut virtual_overseer,
|
||||
@@ -757,6 +818,8 @@ fn fast_path_backing_group_recovers() {
|
||||
_ => Has::No,
|
||||
};
|
||||
|
||||
test_state.respond_to_available_data_query(&mut virtual_overseer, false).await;
|
||||
|
||||
test_state.test_full_data_requests(
|
||||
candidate_hash,
|
||||
&mut virtual_overseer,
|
||||
@@ -808,12 +871,17 @@ fn no_answers_in_fast_path_causes_chunk_requests() {
|
||||
0 | 3 => Has::No,
|
||||
_ => Has::timeout(),
|
||||
};
|
||||
|
||||
test_state.respond_to_available_data_query(&mut virtual_overseer, false).await;
|
||||
|
||||
test_state.test_full_data_requests(
|
||||
candidate_hash,
|
||||
&mut virtual_overseer,
|
||||
who_has,
|
||||
).await;
|
||||
|
||||
test_state.respond_to_query_all_request(&mut virtual_overseer, |_| false).await;
|
||||
|
||||
test_state.test_chunk_requests(
|
||||
candidate_hash,
|
||||
&mut virtual_overseer,
|
||||
@@ -905,6 +973,9 @@ fn chunks_retry_until_all_nodes_respond() {
|
||||
|
||||
let candidate_hash = test_state.candidate.hash();
|
||||
|
||||
test_state.respond_to_available_data_query(&mut virtual_overseer, false).await;
|
||||
test_state.respond_to_query_all_request(&mut virtual_overseer, |_| false).await;
|
||||
|
||||
test_state.test_chunk_requests(
|
||||
candidate_hash,
|
||||
&mut virtual_overseer,
|
||||
@@ -925,3 +996,105 @@ fn chunks_retry_until_all_nodes_respond() {
|
||||
assert_eq!(rx.await.unwrap().unwrap_err(), RecoveryError::Unavailable);
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn returns_early_if_we_have_the_data() {
|
||||
let test_state = TestState::default();
|
||||
|
||||
test_harness_chunks_only(|test_harness| async move {
|
||||
let TestHarness { mut virtual_overseer } = test_harness;
|
||||
|
||||
overseer_signal(
|
||||
&mut virtual_overseer,
|
||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
||||
activated: smallvec![ActivatedLeaf {
|
||||
hash: test_state.current.clone(),
|
||||
number: 1,
|
||||
span: Arc::new(jaeger::Span::Disabled),
|
||||
}],
|
||||
deactivated: smallvec![],
|
||||
}),
|
||||
).await;
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
overseer_send(
|
||||
&mut virtual_overseer,
|
||||
AvailabilityRecoveryMessage::RecoverAvailableData(
|
||||
test_state.candidate.clone(),
|
||||
test_state.session_index,
|
||||
None,
|
||||
tx,
|
||||
)
|
||||
).await;
|
||||
|
||||
test_state.test_runtime_api(&mut virtual_overseer).await;
|
||||
test_state.respond_to_available_data_query(&mut virtual_overseer, true).await;
|
||||
|
||||
assert_eq!(rx.await.unwrap().unwrap(), test_state.available_data);
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn does_not_query_local_validator() {
|
||||
let test_state = TestState::default();
|
||||
|
||||
test_harness_chunks_only(|test_harness| async move {
|
||||
let TestHarness { mut virtual_overseer } = test_harness;
|
||||
|
||||
overseer_signal(
|
||||
&mut virtual_overseer,
|
||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
||||
activated: smallvec![ActivatedLeaf {
|
||||
hash: test_state.current.clone(),
|
||||
number: 1,
|
||||
span: Arc::new(jaeger::Span::Disabled),
|
||||
}],
|
||||
deactivated: smallvec![],
|
||||
}),
|
||||
).await;
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
overseer_send(
|
||||
&mut virtual_overseer,
|
||||
AvailabilityRecoveryMessage::RecoverAvailableData(
|
||||
test_state.candidate.clone(),
|
||||
test_state.session_index,
|
||||
None,
|
||||
tx,
|
||||
)
|
||||
).await;
|
||||
|
||||
test_state.test_runtime_api(&mut virtual_overseer).await;
|
||||
test_state.respond_to_available_data_query(&mut virtual_overseer, false).await;
|
||||
test_state.respond_to_query_all_request(&mut virtual_overseer, |i| i == 0).await;
|
||||
|
||||
let candidate_hash = test_state.candidate.hash();
|
||||
|
||||
test_state.test_chunk_requests(
|
||||
candidate_hash,
|
||||
&mut virtual_overseer,
|
||||
test_state.validators.len(),
|
||||
|i| if i == 0 {
|
||||
panic!("requested from local validator")
|
||||
} else {
|
||||
Has::timeout()
|
||||
},
|
||||
).await;
|
||||
|
||||
// second round, make sure it uses the local chunk.
|
||||
test_state.test_chunk_requests(
|
||||
candidate_hash,
|
||||
&mut virtual_overseer,
|
||||
test_state.threshold() - 1,
|
||||
|i| if i == 0 {
|
||||
panic!("requested from local validator")
|
||||
} else {
|
||||
Has::Yes
|
||||
},
|
||||
).await;
|
||||
|
||||
assert_eq!(rx.await.unwrap().unwrap(), test_state.available_data);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -353,6 +353,9 @@ pub enum AvailabilityStoreMessage {
|
||||
/// Query an `ErasureChunk` from the AV store by the candidate hash and validator index.
|
||||
QueryChunk(CandidateHash, ValidatorIndex, oneshot::Sender<Option<ErasureChunk>>),
|
||||
|
||||
/// Query all chunks that we have for the given candidate hash.
|
||||
QueryAllChunks(CandidateHash, oneshot::Sender<Vec<ErasureChunk>>),
|
||||
|
||||
/// Query whether an `ErasureChunk` exists within the AV Store.
|
||||
///
|
||||
/// This is useful in cases like bitfield signing, when existence
|
||||
@@ -366,8 +369,6 @@ pub enum AvailabilityStoreMessage {
|
||||
StoreChunk {
|
||||
/// A hash of the candidate this chunk belongs to.
|
||||
candidate_hash: CandidateHash,
|
||||
/// A relevant relay parent.
|
||||
relay_parent: Hash,
|
||||
/// The chunk itself.
|
||||
chunk: ErasureChunk,
|
||||
/// Sending side of the channel to send result to.
|
||||
|
||||
@@ -120,21 +120,24 @@ Launch the interaction as a background task running `interaction_loop(interactio
|
||||
const N_PARALLEL: usize = 50;
|
||||
```
|
||||
|
||||
Loop:
|
||||
* If the phase is `InteractionPhase::RequestFromBackers`
|
||||
* Request `AvailabilityStoreMessage::QueryAvailableData`. If it exists, return that.
|
||||
* If the phase is `InteractionPhase::RequestFromBackers`
|
||||
* Loop:
|
||||
* If the `requesting_pov` is `Some`, poll for updates on it. If it concludes, set `requesting_pov` to `None`.
|
||||
* If the `requesting_pov` is `None`, take the next backer off the `shuffled_backers`.
|
||||
* If the backer is `Some`, issue a `FromInteraction::NetworkRequest` with a network request for the `AvailableData` and wait for the response.
|
||||
* If the backer is `Some`, issue a `NetworkBridgeMessage::Requests` with a network request for the `AvailableData` and wait for the response.
|
||||
* If it concludes with a `None` result, return to beginning.
|
||||
* If it concludes with available data, attempt a re-encoding.
|
||||
* If it has the correct erasure-root, break and issue a `Ok(available_data)`.
|
||||
* If it has an incorrect erasure-root, issue a `FromInteraction::ReportPeer` message and return to beginning.
|
||||
* If the backer is `None`, set the phase to `InteractionPhase::RequestChunks` with a random shuffling of validators and empty `next_shuffling`, `received_chunks`, and `requesting_chunks`.
|
||||
* If it has an incorrect erasure-root, return to beginning.
|
||||
* If the backer is `None`, set the phase to `InteractionPhase::RequestChunks` with a random shuffling of validators and empty `next_shuffling`, `received_chunks`, and `requesting_chunks` and break the loop.
|
||||
|
||||
* If the phase is `InteractionPhase::RequestChunks`:
|
||||
* If the phase is `InteractionPhase::RequestChunks`:
|
||||
* Request `AvailabilityStoreMessage::QueryAllChunks`. For each chunk that exists, add it to `received_chunks` and remote the validator from `shuffling`.
|
||||
* Loop:
|
||||
* If `received_chunks + requesting_chunks + shuffling` lengths are less than the threshold, break and return `Err(Unavailable)`.
|
||||
* Poll for new updates from `requesting_chunks`. Check merkle proofs of any received chunks. If the request simply fails due to network issues, insert into the front of `shuffling` to be retried.
|
||||
* If `received_chunks` has more than `threshold` entries, attempt to recover the data. If that fails, or a re-encoding produces an incorrect erasure-root, break and issue a `Err(RecoveryError::Invalid)`. If correct, break and issue `Ok(available_data)`.
|
||||
* While there are fewer than `N_PARALLEL` entries in `requesting_chunks`,
|
||||
* Pop the next item from `shuffling`. If it's empty and `requesting_chunks` is empty, return `Err(RecoveryError::Unavailable)`.
|
||||
* Issue a `FromInteraction::NetworkRequest` and wait for the response in `requesting_chunks`.
|
||||
* Pop the next item from `shuffling`. If it's empty and `requesting_chunks` is empty, return `Err(RecoveryError::Unavailable)`.
|
||||
* Issue a `NetworkBridgeMessage::Requests` and wait for the response in `requesting_chunks`.
|
||||
|
||||
@@ -133,6 +133,10 @@ On `QueryChunk` message:
|
||||
|
||||
This is `O(n)` in the size of the data, which may be large.
|
||||
|
||||
On `QueryAllChunks` message:
|
||||
- Query `("meta", candidate_hash)`. If `None`, send an empty response and return.
|
||||
- For all `1` bits in the `chunks_stored`, query `("chunk", candidate_hash, index)`. Ignore but warn on errors, and return a vector of all loaded chunks.
|
||||
|
||||
On `QueryChunkAvailability message:
|
||||
|
||||
- Query whether `("meta", candidate_hash)` exists and the bit at `index` is set.
|
||||
|
||||
@@ -204,12 +204,14 @@ enum AvailabilityStoreMessage {
|
||||
QueryDataAvailability(CandidateHash, ResponseChannel<bool>),
|
||||
/// Query a specific availability chunk of the candidate's erasure-coding by validator index.
|
||||
/// Returns the chunk and its inclusion proof against the candidate's erasure-root.
|
||||
QueryChunk(CandidateHash, ValidatorIndex, ResponseChannel<Option<AvailabilityChunkAndProof>>),
|
||||
QueryChunk(CandidateHash, ValidatorIndex, ResponseChannel<Option<ErasureChunk>>),
|
||||
/// Query all chunks that we have locally for the given candidate hash.
|
||||
QueryAllChunks(CandidateHash, ResponseChannel<Vec<ErasureChunk>>),
|
||||
/// Store a specific chunk of the candidate's erasure-coding by validator index, with an
|
||||
/// accompanying proof.
|
||||
StoreChunk(CandidateHash, ValidatorIndex, AvailabilityChunkAndProof, ResponseChannel<Result<()>>),
|
||||
StoreChunk(CandidateHash, ErasureChunk, ResponseChannel<Result<()>>),
|
||||
/// Store `AvailableData`. If `ValidatorIndex` is provided, also store this validator's
|
||||
/// `AvailabilityChunkAndProof`.
|
||||
/// `ErasureChunk`.
|
||||
StoreAvailableData(CandidateHash, Option<ValidatorIndex>, u32, AvailableData, ResponseChannel<Result<()>>),
|
||||
}
|
||||
```
|
||||
|
||||
Reference in New Issue
Block a user