From 5da762e7285f95b9ece1a8cf0720dc49bab64f0e Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Thu, 1 Apr 2021 15:57:41 +0200 Subject: [PATCH] Avoid querying the local validator in availability recovery (#2792) * guide: don't request availability data from ourselves * add QueryAllChunks message * implement QueryAllChunks * remove unused relay_parent from StoreChunk * test QueryAllChunks * fast paths make short roads * test early exit behavior --- polkadot/node/core/av-store/src/lib.rs | 28 ++- polkadot/node/core/av-store/src/tests.rs | 116 ++++++++++-- .../src/requester/fetch_task/mod.rs | 3 +- .../src/tests/state.rs | 2 +- .../network/availability-recovery/src/lib.rs | 48 +++++ .../availability-recovery/src/tests.rs | 173 ++++++++++++++++++ polkadot/node/subsystem/src/messages.rs | 5 +- .../availability/availability-recovery.md | 19 +- .../src/node/utility/availability-store.md | 4 + .../src/types/overseer-protocol.md | 8 +- 10 files changed, 375 insertions(+), 31 deletions(-) diff --git a/polkadot/node/core/av-store/src/lib.rs b/polkadot/node/core/av-store/src/lib.rs index 6d3e07c9d3..4e6a4e3c65 100644 --- a/polkadot/node/core/av-store/src/lib.rs +++ b/polkadot/node/core/av-store/src/lib.rs @@ -980,6 +980,33 @@ fn process_message( let _timer = subsystem.metrics.time_get_chunk(); let _ = tx.send(load_chunk(&subsystem.db, &candidate, validator_index)?); } + AvailabilityStoreMessage::QueryAllChunks(candidate, tx) => { + match load_meta(&subsystem.db, &candidate)? { + None => { + let _ = tx.send(Vec::new()); + } + Some(meta) => { + let mut chunks = Vec::new(); + + for (index, _) in meta.chunks_stored.iter().enumerate().filter(|(_, b)| **b) { + let _timer = subsystem.metrics.time_get_chunk(); + match load_chunk(&subsystem.db, &candidate, ValidatorIndex(index as _))? { + Some(c) => chunks.push(c), + None => { + tracing::warn!( + target: LOG_TARGET, + ?candidate, + index, + "No chunk found for set bit in meta" + ); + } + } + } + + let _ = tx.send(chunks); + } + } + } AvailabilityStoreMessage::QueryChunkAvailability(candidate, validator_index, tx) => { let a = load_meta(&subsystem.db, &candidate)? .map_or(false, |m| @@ -989,7 +1016,6 @@ fn process_message( } AvailabilityStoreMessage::StoreChunk { candidate_hash, - relay_parent: _, chunk, tx, } => { diff --git a/polkadot/node/core/av-store/src/tests.rs b/polkadot/node/core/av-store/src/tests.rs index 939ea4a8e0..63fd628885 100644 --- a/polkadot/node/core/av-store/src/tests.rs +++ b/polkadot/node/core/av-store/src/tests.rs @@ -284,7 +284,6 @@ fn store_chunk_works() { let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS)); test_harness(TestState::default(), store.clone(), |test_harness| async move { let TestHarness { mut virtual_overseer } = test_harness; - let relay_parent = Hash::repeat_byte(32); let candidate_hash = CandidateHash(Hash::repeat_byte(33)); let validator_index = ValidatorIndex(5); let n_validators = 10; @@ -309,7 +308,6 @@ fn store_chunk_works() { let chunk_msg = AvailabilityStoreMessage::StoreChunk { candidate_hash, - relay_parent, chunk: chunk.clone(), tx, }; @@ -336,7 +334,6 @@ fn store_chunk_does_nothing_if_no_entry_already() { let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS)); test_harness(TestState::default(), store.clone(), |test_harness| async move { let TestHarness { mut virtual_overseer } = test_harness; - let relay_parent = Hash::repeat_byte(32); let candidate_hash = CandidateHash(Hash::repeat_byte(33)); let validator_index = ValidatorIndex(5); @@ -350,7 +347,6 @@ fn store_chunk_does_nothing_if_no_entry_already() { let chunk_msg = AvailabilityStoreMessage::StoreChunk { candidate_hash, - relay_parent, chunk: chunk.clone(), tx, }; @@ -510,6 +506,98 @@ fn store_pov_and_query_chunk_works() { }); } +#[test] +fn query_all_chunks_works() { + let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS)); + let test_state = TestState::default(); + + test_harness(test_state.clone(), store.clone(), |test_harness| async move { + let TestHarness { mut virtual_overseer } = test_harness; + + // all chunks for hash 1. + // 1 chunk for hash 2. + // 0 chunks for hash 3. + let candidate_hash_1 = CandidateHash(Hash::repeat_byte(1)); + let candidate_hash_2 = CandidateHash(Hash::repeat_byte(2)); + let candidate_hash_3 = CandidateHash(Hash::repeat_byte(3)); + + let n_validators = 10; + + let pov = PoV { + block_data: BlockData(vec![4, 5, 6]), + }; + + let available_data = AvailableData { + pov: Arc::new(pov), + validation_data: test_state.persisted_validation_data.clone(), + }; + + { + let (tx, rx) = oneshot::channel(); + let block_msg = AvailabilityStoreMessage::StoreAvailableData( + candidate_hash_1, + None, + n_validators, + available_data, + tx, + ); + + virtual_overseer.send(FromOverseer::Communication { msg: block_msg }).await; + assert_eq!(rx.await.unwrap(), Ok(())); + } + + { + with_tx(&store, |tx| { + super::write_meta(tx, &candidate_hash_2, &CandidateMeta { + data_available: false, + chunks_stored: bitvec::bitvec![BitOrderLsb0, u8; 0; n_validators as _], + state: State::Unavailable(BETimestamp(0)), + }); + }); + + let chunk = ErasureChunk { + chunk: vec![1, 2, 3], + index: ValidatorIndex(1), + proof: vec![vec![3, 4, 5]], + }; + + let (tx, rx) = oneshot::channel(); + let store_chunk_msg = AvailabilityStoreMessage::StoreChunk { + candidate_hash: candidate_hash_2, + chunk, + tx, + }; + + virtual_overseer.send(FromOverseer::Communication { msg: store_chunk_msg }).await; + assert_eq!(rx.await.unwrap(), Ok(())); + } + + { + let (tx, rx) = oneshot::channel(); + + let msg = AvailabilityStoreMessage::QueryAllChunks(candidate_hash_1, tx); + virtual_overseer.send(FromOverseer::Communication { msg }).await; + assert_eq!(rx.await.unwrap().len(), n_validators as usize); + } + + { + let (tx, rx) = oneshot::channel(); + + let msg = AvailabilityStoreMessage::QueryAllChunks(candidate_hash_2, tx); + virtual_overseer.send(FromOverseer::Communication { msg }).await; + assert_eq!(rx.await.unwrap().len(), 1); + } + + { + let (tx, rx) = oneshot::channel(); + + let msg = AvailabilityStoreMessage::QueryAllChunks(candidate_hash_3, tx); + virtual_overseer.send(FromOverseer::Communication { msg }).await; + assert_eq!(rx.await.unwrap().len(), 0); + } + }); +} + #[test] fn stored_but_not_included_data_is_pruned() { let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS)); @@ -625,7 +713,7 @@ fn stored_data_kept_until_finalized() { ); assert!( - query_all_chunks(&mut virtual_overseer, candidate_hash, n_validators, true).await + has_all_chunks(&mut virtual_overseer, candidate_hash, n_validators, true).await ); overseer_signal( @@ -644,7 +732,7 @@ fn stored_data_kept_until_finalized() { ); assert!( - query_all_chunks(&mut virtual_overseer, candidate_hash, n_validators, true).await + has_all_chunks(&mut virtual_overseer, candidate_hash, n_validators, true).await ); // Wait until it definitely should be gone. @@ -657,7 +745,7 @@ fn stored_data_kept_until_finalized() { ); assert!( - query_all_chunks(&mut virtual_overseer, candidate_hash, n_validators, false).await + has_all_chunks(&mut virtual_overseer, candidate_hash, n_validators, false).await ); }); } @@ -781,11 +869,11 @@ fn forkfullness_works() { ); assert!( - query_all_chunks(&mut virtual_overseer, candidate_1_hash, n_validators, true).await, + has_all_chunks(&mut virtual_overseer, candidate_1_hash, n_validators, true).await, ); assert!( - query_all_chunks(&mut virtual_overseer, candidate_2_hash, n_validators, true).await, + has_all_chunks(&mut virtual_overseer, candidate_2_hash, n_validators, true).await, ); // Candidate 2 should now be considered unavailable and will be pruned. @@ -802,11 +890,11 @@ fn forkfullness_works() { ); assert!( - query_all_chunks(&mut virtual_overseer, candidate_1_hash, n_validators, true).await, + has_all_chunks(&mut virtual_overseer, candidate_1_hash, n_validators, true).await, ); assert!( - query_all_chunks(&mut virtual_overseer, candidate_2_hash, n_validators, false).await, + has_all_chunks(&mut virtual_overseer, candidate_2_hash, n_validators, false).await, ); // Wait for longer than finalized blocks should be kept for @@ -823,11 +911,11 @@ fn forkfullness_works() { ); assert!( - query_all_chunks(&mut virtual_overseer, candidate_1_hash, n_validators, false).await, + has_all_chunks(&mut virtual_overseer, candidate_1_hash, n_validators, false).await, ); assert!( - query_all_chunks(&mut virtual_overseer, candidate_2_hash, n_validators, false).await, + has_all_chunks(&mut virtual_overseer, candidate_2_hash, n_validators, false).await, ); }); } @@ -857,7 +945,7 @@ async fn query_chunk( rx.await.unwrap() } -async fn query_all_chunks( +async fn has_all_chunks( virtual_overseer: &mut test_helpers::TestSubsystemContextHandle, candidate_hash: CandidateHash, n_validators: u32, diff --git a/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs b/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs index 04b50a78ff..c5527eedc4 100644 --- a/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs +++ b/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs @@ -27,7 +27,7 @@ use polkadot_node_network_protocol::request_response::{ v1::{ChunkFetchingRequest, ChunkFetchingResponse}, }; use polkadot_primitives::v1::{AuthorityDiscoveryId, BlakeTwo256, CandidateHash, GroupIndex, Hash, HashT, OccupiedCore, SessionIndex}; -use polkadot_node_primitives::ErasureChunk; +use polkadot_node_primitives::ErasureChunk; use polkadot_subsystem::messages::{ AllMessages, AvailabilityStoreMessage, NetworkBridgeMessage, IfDisconnected, }; @@ -405,7 +405,6 @@ impl RunningTask { .send(FromFetchTask::Message(AllMessages::AvailabilityStore( AvailabilityStoreMessage::StoreChunk { candidate_hash: self.request.candidate_hash, - relay_parent: self.relay_parent, chunk, tx, }, diff --git a/polkadot/node/network/availability-distribution/src/tests/state.rs b/polkadot/node/network/availability-distribution/src/tests/state.rs index b1a1850024..2adbdaba84 100644 --- a/polkadot/node/network/availability-distribution/src/tests/state.rs +++ b/polkadot/node/network/availability-distribution/src/tests/state.rs @@ -67,7 +67,7 @@ pub struct TestState { pub relay_chain: Vec, /// Whenever the subsystem tries to fetch an erasure chunk one item of the given vec will be /// popped. So you can experiment with serving invalid chunks or no chunks on request and see - /// whether the subystem still succeds with its goal. + /// whether the subystem still succeds with its goal. pub chunks: HashMap<(CandidateHash, ValidatorIndex), Vec>>, /// All chunks that are valid and should be accepted. pub valid_chunks: HashSet<(CandidateHash, ValidatorIndex)>, diff --git a/polkadot/node/network/availability-recovery/src/lib.rs b/polkadot/node/network/availability-recovery/src/lib.rs index 9f97c87bed..3a8474de8a 100644 --- a/polkadot/node/network/availability-recovery/src/lib.rs +++ b/polkadot/node/network/availability-recovery/src/lib.rs @@ -334,6 +334,34 @@ impl RequestChunksPhase { params: &InteractionParams, sender: &mut impl SubsystemSender, ) -> Result { + // First query the store for any chunks we've got. + { + let (tx, rx) = oneshot::channel(); + sender.send_message( + AvailabilityStoreMessage::QueryAllChunks(params.candidate_hash, tx).into() + ).await; + + match rx.await { + Ok(chunks) => { + // This should either be length 1 or 0. If we had the whole data, + // we wouldn't have reached this stage. + let chunk_indices: Vec<_> = chunks.iter().map(|c| c.index).collect(); + self.shuffling.retain(|i| !chunk_indices.contains(i)); + + for chunk in chunks { + self.received_chunks.insert(chunk.index, chunk); + } + } + Err(oneshot::Canceled) => { + tracing::warn!( + target: LOG_TARGET, + candidate_hash = ?params.candidate_hash, + "Failed to reach the availability store" + ); + } + } + } + loop { if self.is_unavailable(¶ms) { tracing::debug!( @@ -431,6 +459,26 @@ fn reconstructed_data_matches_root( impl Interaction { async fn run(mut self) -> Result { + // First just see if we have the data available locally. + { + let (tx, rx) = oneshot::channel(); + self.sender.send_message( + AvailabilityStoreMessage::QueryAvailableData(self.params.candidate_hash, tx).into() + ).await; + + match rx.await { + Ok(Some(data)) => return Ok(data), + Ok(None) => {} + Err(oneshot::Canceled) => { + tracing::warn!( + target: LOG_TARGET, + candidate_hash = ?self.params.candidate_hash, + "Failed to reach the availability store", + ) + } + } + } + loop { // These only fail if we cannot reach the underlying subsystem, which case there is nothing // meaningful we can do. diff --git a/polkadot/node/network/availability-recovery/src/tests.rs b/polkadot/node/network/availability-recovery/src/tests.rs index ff14cf0706..028928f853 100644 --- a/polkadot/node/network/availability-recovery/src/tests.rs +++ b/polkadot/node/network/availability-recovery/src/tests.rs @@ -207,6 +207,46 @@ impl TestState { ); } + async fn respond_to_available_data_query( + &self, + virtual_overseer: &mut VirtualOverseer, + with_data: bool, + ) { + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::AvailabilityStore( + AvailabilityStoreMessage::QueryAvailableData(_, tx) + ) => { + let _ = tx.send(if with_data { + Some(self.available_data.clone()) + } else { + println!("SENDING NONE"); + None + }); + } + ) + } + + async fn respond_to_query_all_request( + &self, + virtual_overseer: &mut VirtualOverseer, + send_chunk: impl Fn(usize) -> bool + ) { + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::AvailabilityStore( + AvailabilityStoreMessage::QueryAllChunks(_, tx) + ) => { + let v = self.chunks.iter() + .filter(|c| send_chunk(c.index.0 as usize)) + .cloned() + .collect(); + + let _ = tx.send(v); + } + ) + } + async fn test_chunk_requests( &self, candidate_hash: CandidateHash, @@ -430,6 +470,9 @@ fn availability_is_recovered_from_chunks_if_no_group_provided() { let candidate_hash = test_state.candidate.hash(); + test_state.respond_to_available_data_query(&mut virtual_overseer, false).await; + test_state.respond_to_query_all_request(&mut virtual_overseer, |_| false).await; + test_state.test_chunk_requests( candidate_hash, &mut virtual_overseer, @@ -459,6 +502,9 @@ fn availability_is_recovered_from_chunks_if_no_group_provided() { test_state.test_runtime_api(&mut virtual_overseer).await; + test_state.respond_to_available_data_query(&mut virtual_overseer, false).await; + test_state.respond_to_query_all_request(&mut virtual_overseer, |_| false).await; + test_state.test_chunk_requests( new_candidate.hash(), &mut virtual_overseer, @@ -506,6 +552,9 @@ fn availability_is_recovered_from_chunks_even_if_backing_group_supplied_if_chunk let candidate_hash = test_state.candidate.hash(); + test_state.respond_to_available_data_query(&mut virtual_overseer, false).await; + test_state.respond_to_query_all_request(&mut virtual_overseer, |_| false).await; + test_state.test_chunk_requests( candidate_hash, &mut virtual_overseer, @@ -535,6 +584,9 @@ fn availability_is_recovered_from_chunks_even_if_backing_group_supplied_if_chunk test_state.test_runtime_api(&mut virtual_overseer).await; + test_state.respond_to_available_data_query(&mut virtual_overseer, false).await; + test_state.respond_to_query_all_request(&mut virtual_overseer, |_| false).await; + test_state.test_chunk_requests( new_candidate.hash(), &mut virtual_overseer, @@ -589,6 +641,9 @@ fn bad_merkle_path_leads_to_recovery_error() { test_state.chunks[3].chunk = vec![3; 32]; test_state.chunks[4].chunk = vec![4; 32]; + test_state.respond_to_available_data_query(&mut virtual_overseer, false).await; + test_state.respond_to_query_all_request(&mut virtual_overseer, |_| false).await; + test_state.test_chunk_requests( candidate_hash, &mut virtual_overseer, @@ -642,6 +697,9 @@ fn wrong_chunk_index_leads_to_recovery_error() { test_state.chunks[3] = test_state.chunks[0].clone(); test_state.chunks[4] = test_state.chunks[0].clone(); + test_state.respond_to_available_data_query(&mut virtual_overseer, false).await; + test_state.respond_to_query_all_request(&mut virtual_overseer, |_| false).await; + test_state.test_chunk_requests( candidate_hash, &mut virtual_overseer, @@ -705,6 +763,9 @@ fn invalid_erasure_coding_leads_to_invalid_error() { test_state.test_runtime_api(&mut virtual_overseer).await; + test_state.respond_to_available_data_query(&mut virtual_overseer, false).await; + test_state.respond_to_query_all_request(&mut virtual_overseer, |_| false).await; + test_state.test_chunk_requests( candidate_hash, &mut virtual_overseer, @@ -757,6 +818,8 @@ fn fast_path_backing_group_recovers() { _ => Has::No, }; + test_state.respond_to_available_data_query(&mut virtual_overseer, false).await; + test_state.test_full_data_requests( candidate_hash, &mut virtual_overseer, @@ -808,12 +871,17 @@ fn no_answers_in_fast_path_causes_chunk_requests() { 0 | 3 => Has::No, _ => Has::timeout(), }; + + test_state.respond_to_available_data_query(&mut virtual_overseer, false).await; + test_state.test_full_data_requests( candidate_hash, &mut virtual_overseer, who_has, ).await; + test_state.respond_to_query_all_request(&mut virtual_overseer, |_| false).await; + test_state.test_chunk_requests( candidate_hash, &mut virtual_overseer, @@ -905,6 +973,9 @@ fn chunks_retry_until_all_nodes_respond() { let candidate_hash = test_state.candidate.hash(); + test_state.respond_to_available_data_query(&mut virtual_overseer, false).await; + test_state.respond_to_query_all_request(&mut virtual_overseer, |_| false).await; + test_state.test_chunk_requests( candidate_hash, &mut virtual_overseer, @@ -925,3 +996,105 @@ fn chunks_retry_until_all_nodes_respond() { assert_eq!(rx.await.unwrap().unwrap_err(), RecoveryError::Unavailable); }); } + +#[test] +fn returns_early_if_we_have_the_data() { + let test_state = TestState::default(); + + test_harness_chunks_only(|test_harness| async move { + let TestHarness { mut virtual_overseer } = test_harness; + + overseer_signal( + &mut virtual_overseer, + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { + activated: smallvec![ActivatedLeaf { + hash: test_state.current.clone(), + number: 1, + span: Arc::new(jaeger::Span::Disabled), + }], + deactivated: smallvec![], + }), + ).await; + + let (tx, rx) = oneshot::channel(); + + overseer_send( + &mut virtual_overseer, + AvailabilityRecoveryMessage::RecoverAvailableData( + test_state.candidate.clone(), + test_state.session_index, + None, + tx, + ) + ).await; + + test_state.test_runtime_api(&mut virtual_overseer).await; + test_state.respond_to_available_data_query(&mut virtual_overseer, true).await; + + assert_eq!(rx.await.unwrap().unwrap(), test_state.available_data); + }); +} + +#[test] +fn does_not_query_local_validator() { + let test_state = TestState::default(); + + test_harness_chunks_only(|test_harness| async move { + let TestHarness { mut virtual_overseer } = test_harness; + + overseer_signal( + &mut virtual_overseer, + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { + activated: smallvec![ActivatedLeaf { + hash: test_state.current.clone(), + number: 1, + span: Arc::new(jaeger::Span::Disabled), + }], + deactivated: smallvec![], + }), + ).await; + + let (tx, rx) = oneshot::channel(); + + overseer_send( + &mut virtual_overseer, + AvailabilityRecoveryMessage::RecoverAvailableData( + test_state.candidate.clone(), + test_state.session_index, + None, + tx, + ) + ).await; + + test_state.test_runtime_api(&mut virtual_overseer).await; + test_state.respond_to_available_data_query(&mut virtual_overseer, false).await; + test_state.respond_to_query_all_request(&mut virtual_overseer, |i| i == 0).await; + + let candidate_hash = test_state.candidate.hash(); + + test_state.test_chunk_requests( + candidate_hash, + &mut virtual_overseer, + test_state.validators.len(), + |i| if i == 0 { + panic!("requested from local validator") + } else { + Has::timeout() + }, + ).await; + + // second round, make sure it uses the local chunk. + test_state.test_chunk_requests( + candidate_hash, + &mut virtual_overseer, + test_state.threshold() - 1, + |i| if i == 0 { + panic!("requested from local validator") + } else { + Has::Yes + }, + ).await; + + assert_eq!(rx.await.unwrap().unwrap(), test_state.available_data); + }); +} diff --git a/polkadot/node/subsystem/src/messages.rs b/polkadot/node/subsystem/src/messages.rs index 06d41def80..d9a17bb3da 100644 --- a/polkadot/node/subsystem/src/messages.rs +++ b/polkadot/node/subsystem/src/messages.rs @@ -353,6 +353,9 @@ pub enum AvailabilityStoreMessage { /// Query an `ErasureChunk` from the AV store by the candidate hash and validator index. QueryChunk(CandidateHash, ValidatorIndex, oneshot::Sender>), + /// Query all chunks that we have for the given candidate hash. + QueryAllChunks(CandidateHash, oneshot::Sender>), + /// Query whether an `ErasureChunk` exists within the AV Store. /// /// This is useful in cases like bitfield signing, when existence @@ -366,8 +369,6 @@ pub enum AvailabilityStoreMessage { StoreChunk { /// A hash of the candidate this chunk belongs to. candidate_hash: CandidateHash, - /// A relevant relay parent. - relay_parent: Hash, /// The chunk itself. chunk: ErasureChunk, /// Sending side of the channel to send result to. diff --git a/polkadot/roadmap/implementers-guide/src/node/availability/availability-recovery.md b/polkadot/roadmap/implementers-guide/src/node/availability/availability-recovery.md index 609282789e..19004f456c 100644 --- a/polkadot/roadmap/implementers-guide/src/node/availability/availability-recovery.md +++ b/polkadot/roadmap/implementers-guide/src/node/availability/availability-recovery.md @@ -120,21 +120,24 @@ Launch the interaction as a background task running `interaction_loop(interactio const N_PARALLEL: usize = 50; ``` -Loop: - * If the phase is `InteractionPhase::RequestFromBackers` +* Request `AvailabilityStoreMessage::QueryAvailableData`. If it exists, return that. +* If the phase is `InteractionPhase::RequestFromBackers` + * Loop: * If the `requesting_pov` is `Some`, poll for updates on it. If it concludes, set `requesting_pov` to `None`. * If the `requesting_pov` is `None`, take the next backer off the `shuffled_backers`. - * If the backer is `Some`, issue a `FromInteraction::NetworkRequest` with a network request for the `AvailableData` and wait for the response. + * If the backer is `Some`, issue a `NetworkBridgeMessage::Requests` with a network request for the `AvailableData` and wait for the response. * If it concludes with a `None` result, return to beginning. * If it concludes with available data, attempt a re-encoding. * If it has the correct erasure-root, break and issue a `Ok(available_data)`. - * If it has an incorrect erasure-root, issue a `FromInteraction::ReportPeer` message and return to beginning. - * If the backer is `None`, set the phase to `InteractionPhase::RequestChunks` with a random shuffling of validators and empty `next_shuffling`, `received_chunks`, and `requesting_chunks`. + * If it has an incorrect erasure-root, return to beginning. + * If the backer is `None`, set the phase to `InteractionPhase::RequestChunks` with a random shuffling of validators and empty `next_shuffling`, `received_chunks`, and `requesting_chunks` and break the loop. - * If the phase is `InteractionPhase::RequestChunks`: +* If the phase is `InteractionPhase::RequestChunks`: + * Request `AvailabilityStoreMessage::QueryAllChunks`. For each chunk that exists, add it to `received_chunks` and remote the validator from `shuffling`. + * Loop: * If `received_chunks + requesting_chunks + shuffling` lengths are less than the threshold, break and return `Err(Unavailable)`. * Poll for new updates from `requesting_chunks`. Check merkle proofs of any received chunks. If the request simply fails due to network issues, insert into the front of `shuffling` to be retried. * If `received_chunks` has more than `threshold` entries, attempt to recover the data. If that fails, or a re-encoding produces an incorrect erasure-root, break and issue a `Err(RecoveryError::Invalid)`. If correct, break and issue `Ok(available_data)`. * While there are fewer than `N_PARALLEL` entries in `requesting_chunks`, - * Pop the next item from `shuffling`. If it's empty and `requesting_chunks` is empty, return `Err(RecoveryError::Unavailable)`. - * Issue a `FromInteraction::NetworkRequest` and wait for the response in `requesting_chunks`. + * Pop the next item from `shuffling`. If it's empty and `requesting_chunks` is empty, return `Err(RecoveryError::Unavailable)`. + * Issue a `NetworkBridgeMessage::Requests` and wait for the response in `requesting_chunks`. diff --git a/polkadot/roadmap/implementers-guide/src/node/utility/availability-store.md b/polkadot/roadmap/implementers-guide/src/node/utility/availability-store.md index 7d2ec5e7bb..e4f94cdcb7 100644 --- a/polkadot/roadmap/implementers-guide/src/node/utility/availability-store.md +++ b/polkadot/roadmap/implementers-guide/src/node/utility/availability-store.md @@ -133,6 +133,10 @@ On `QueryChunk` message: This is `O(n)` in the size of the data, which may be large. +On `QueryAllChunks` message: + - Query `("meta", candidate_hash)`. If `None`, send an empty response and return. + - For all `1` bits in the `chunks_stored`, query `("chunk", candidate_hash, index)`. Ignore but warn on errors, and return a vector of all loaded chunks. + On `QueryChunkAvailability message: - Query whether `("meta", candidate_hash)` exists and the bit at `index` is set. diff --git a/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md b/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md index 2ac0b1e124..c4d51a6110 100644 --- a/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md +++ b/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md @@ -204,12 +204,14 @@ enum AvailabilityStoreMessage { QueryDataAvailability(CandidateHash, ResponseChannel), /// Query a specific availability chunk of the candidate's erasure-coding by validator index. /// Returns the chunk and its inclusion proof against the candidate's erasure-root. - QueryChunk(CandidateHash, ValidatorIndex, ResponseChannel>), + QueryChunk(CandidateHash, ValidatorIndex, ResponseChannel>), + /// Query all chunks that we have locally for the given candidate hash. + QueryAllChunks(CandidateHash, ResponseChannel>), /// Store a specific chunk of the candidate's erasure-coding by validator index, with an /// accompanying proof. - StoreChunk(CandidateHash, ValidatorIndex, AvailabilityChunkAndProof, ResponseChannel>), + StoreChunk(CandidateHash, ErasureChunk, ResponseChannel>), /// Store `AvailableData`. If `ValidatorIndex` is provided, also store this validator's - /// `AvailabilityChunkAndProof`. + /// `ErasureChunk`. StoreAvailableData(CandidateHash, Option, u32, AvailableData, ResponseChannel>), } ```