diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 0a4a36cac8..7492465574 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -5407,6 +5407,7 @@ dependencies = [ "futures-timer 3.0.2", "log", "lru", + "parity-scale-codec", "polkadot-erasure-coding", "polkadot-node-network-protocol", "polkadot-node-subsystem", @@ -5414,11 +5415,11 @@ dependencies = [ "polkadot-node-subsystem-util", "polkadot-primitives", "rand 0.8.3", + "sc-network", "smallvec 1.6.1", "sp-application-crypto", "sp-core", "sp-keyring", - "streamunordered", "thiserror", "tracing", ] diff --git a/polkadot/node/network/availability-distribution/src/lib.rs b/polkadot/node/network/availability-distribution/src/lib.rs index 07f3db51b5..3dd7b4e69c 100644 --- a/polkadot/node/network/availability-distribution/src/lib.rs +++ b/polkadot/node/network/availability-distribution/src/lib.rs @@ -121,7 +121,7 @@ impl AvailabilityDistributionSubsystem { return Ok(()); } FromOverseer::Communication { - msg: AvailabilityDistributionMessage::AvailabilityFetchingRequest(req), + msg: AvailabilityDistributionMessage::ChunkFetchingRequest(req), } => { answer_request_log(&mut ctx, req, &self.metrics).await } diff --git a/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs b/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs index 6096081ed2..c4e539ab23 100644 --- a/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs +++ b/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs @@ -24,7 +24,7 @@ use futures::{FutureExt, SinkExt}; use polkadot_erasure_coding::branch_hash; use polkadot_node_network_protocol::request_response::{ request::{OutgoingRequest, RequestError, Requests, Recipient}, - v1::{AvailabilityFetchingRequest, AvailabilityFetchingResponse}, + v1::{ChunkFetchingRequest, ChunkFetchingResponse}, }; use polkadot_primitives::v1::{ AuthorityDiscoveryId, BlakeTwo256, ErasureChunk, GroupIndex, Hash, HashT, OccupiedCore, @@ -106,7 +106,7 @@ struct RunningTask { group: Vec, /// The request to send. - request: AvailabilityFetchingRequest, + request: ChunkFetchingRequest, /// Root hash, for verifying the chunks validity. erasure_root: Hash, @@ -154,7 +154,7 @@ impl FetchTaskConfig { group: session_info.validator_groups.get(core.group_responsible.0 as usize) .expect("The responsible group of a candidate should be available in the corresponding session. qed.") .clone(), - request: AvailabilityFetchingRequest { + request: ChunkFetchingRequest { candidate_hash: core.candidate_hash, index: session_info.our_index, }, @@ -292,10 +292,10 @@ impl RunningTask { } }; let chunk = match resp { - AvailabilityFetchingResponse::Chunk(resp) => { + ChunkFetchingResponse::Chunk(resp) => { resp.recombine_into_chunk(&self.request) } - AvailabilityFetchingResponse::NoSuchChunk => { + ChunkFetchingResponse::NoSuchChunk => { tracing::debug!( target: LOG_TARGET, validator = ?validator, @@ -327,10 +327,10 @@ impl RunningTask { async fn do_request( &mut self, validator: &AuthorityDiscoveryId, - ) -> std::result::Result { + ) -> std::result::Result { let (full_request, response_recv) = OutgoingRequest::new(Recipient::Authority(validator.clone()), self.request); - let requests = Requests::AvailabilityFetching(full_request); + let requests = Requests::ChunkFetching(full_request); self.sender .send(FromFetchTask::Message(AllMessages::NetworkBridge( diff --git a/polkadot/node/network/availability-distribution/src/requester/fetch_task/tests.rs b/polkadot/node/network/availability-distribution/src/requester/fetch_task/tests.rs index 4fe314cee3..76c247fb05 100644 --- a/polkadot/node/network/availability-distribution/src/requester/fetch_task/tests.rs +++ b/polkadot/node/network/availability-distribution/src/requester/fetch_task/tests.rs @@ -58,7 +58,7 @@ fn task_does_not_accept_invalid_chunk() { let mut m = HashMap::new(); m.insert( Recipient::Authority(Sr25519Keyring::Alice.public().into()), - AvailabilityFetchingResponse::Chunk( + ChunkFetchingResponse::Chunk( v1::ChunkResponse { chunk: vec![1,2,3], proof: vec![vec![9,8,2], vec![2,3,4]], @@ -90,7 +90,7 @@ fn task_stores_valid_chunk() { let mut m = HashMap::new(); m.insert( Recipient::Authority(Sr25519Keyring::Alice.public().into()), - AvailabilityFetchingResponse::Chunk( + ChunkFetchingResponse::Chunk( v1::ChunkResponse { chunk: chunk.chunk.clone(), proof: chunk.proof, @@ -126,7 +126,7 @@ fn task_does_not_accept_wrongly_indexed_chunk() { let mut m = HashMap::new(); m.insert( Recipient::Authority(Sr25519Keyring::Alice.public().into()), - AvailabilityFetchingResponse::Chunk( + ChunkFetchingResponse::Chunk( v1::ChunkResponse { chunk: chunk.chunk.clone(), proof: chunk.proof, @@ -165,7 +165,7 @@ fn task_stores_valid_chunk_if_there_is_one() { let mut m = HashMap::new(); m.insert( Recipient::Authority(Sr25519Keyring::Alice.public().into()), - AvailabilityFetchingResponse::Chunk( + ChunkFetchingResponse::Chunk( v1::ChunkResponse { chunk: chunk.chunk.clone(), proof: chunk.proof, @@ -174,11 +174,11 @@ fn task_stores_valid_chunk_if_there_is_one() { ); m.insert( Recipient::Authority(Sr25519Keyring::Bob.public().into()), - AvailabilityFetchingResponse::NoSuchChunk + ChunkFetchingResponse::NoSuchChunk ); m.insert( Recipient::Authority(Sr25519Keyring::Charlie.public().into()), - AvailabilityFetchingResponse::Chunk( + ChunkFetchingResponse::Chunk( v1::ChunkResponse { chunk: vec![1,2,3], proof: vec![vec![9,8,2], vec![2,3,4]], @@ -200,7 +200,7 @@ fn task_stores_valid_chunk_if_there_is_one() { struct TestRun { /// Response to deliver for a given validator index. /// None means, answer with NetworkError. - chunk_responses: HashMap, + chunk_responses: HashMap, /// Set of chunks that should be considered valid: valid_chunks: HashSet>, } @@ -227,7 +227,7 @@ impl TestRun { ); match msg { FromFetchTask::Concluded(_) => break, - FromFetchTask::Message(msg) => + FromFetchTask::Message(msg) => end_ok = self.handle_message(msg).await, } } @@ -245,13 +245,13 @@ impl TestRun { let mut valid_responses = 0; for req in reqs { let req = match req { - Requests::AvailabilityFetching(req) => req, + Requests::ChunkFetching(req) => req, _ => panic!("Unexpected request"), }; let response = self.chunk_responses.get(&req.peer) .ok_or(network::RequestFailure::Refused); - if let Ok(AvailabilityFetchingResponse::Chunk(resp)) = &response { + if let Ok(ChunkFetchingResponse::Chunk(resp)) = &response { if self.valid_chunks.contains(&resp.chunk) { valid_responses += 1; } @@ -285,7 +285,7 @@ fn get_test_running_task() -> (RunningTask, mpsc::Receiver) { session_index: 0, group_index: GroupIndex(0), group: Vec::new(), - request: AvailabilityFetchingRequest { + request: ChunkFetchingRequest { candidate_hash: CandidateHash([43u8;32].into()), index: ValidatorIndex(0), }, diff --git a/polkadot/node/network/availability-distribution/src/responder.rs b/polkadot/node/network/availability-distribution/src/responder.rs index 394ddaf1c8..8e37c6cf2f 100644 --- a/polkadot/node/network/availability-distribution/src/responder.rs +++ b/polkadot/node/network/availability-distribution/src/responder.rs @@ -33,7 +33,7 @@ use crate::{LOG_TARGET, metrics::{Metrics, SUCCEEDED, FAILED, NOT_FOUND}}; /// Any errors of `answer_request` will simply be logged. pub async fn answer_request_log( ctx: &mut Context, - req: IncomingRequest, + req: IncomingRequest, metrics: &Metrics, ) -> () where @@ -59,7 +59,7 @@ where /// Returns: Ok(true) if chunk was found and served. pub async fn answer_request( ctx: &mut Context, - req: IncomingRequest, + req: IncomingRequest, ) -> Result where Context: SubsystemContext, @@ -84,8 +84,8 @@ where ); let response = match chunk { - None => v1::AvailabilityFetchingResponse::NoSuchChunk, - Some(chunk) => v1::AvailabilityFetchingResponse::Chunk(chunk.into()), + None => v1::ChunkFetchingResponse::NoSuchChunk, + Some(chunk) => v1::ChunkFetchingResponse::Chunk(chunk.into()), }; req.send_response(response).map_err(|_| Error::SendResponse)?; diff --git a/polkadot/node/network/availability-distribution/src/tests/state.rs b/polkadot/node/network/availability-distribution/src/tests/state.rs index d342453a0a..a227cca036 100644 --- a/polkadot/node/network/availability-distribution/src/tests/state.rs +++ b/polkadot/node/network/availability-distribution/src/tests/state.rs @@ -96,7 +96,7 @@ impl Default for TestState { let mut cores = HashMap::new(); let mut chunks = HashMap::new(); - cores.insert(relay_chain[0], + cores.insert(relay_chain[0], vec![ CoreState::Scheduled(ScheduledCore { para_id: chain_ids[0], @@ -148,7 +148,7 @@ impl Default for TestState { } impl TestState { - + /// Run, but fail after some timeout. pub async fn run(self, harness: TestHarness) { // Make sure test won't run forever. @@ -178,7 +178,7 @@ impl TestState { // // Test will fail if this does not happen until timeout. let mut remaining_stores = self.valid_chunks.len(); - + let TestSubsystemContextHandle { tx, mut rx } = virtual_overseer; // Spawning necessary as incoming queue can only hold a single item, we don't want to dead @@ -210,7 +210,7 @@ impl TestState { executor.spawn("Request forwarding", overseer_send( tx.clone(), - AvailabilityDistributionMessage::AvailabilityFetchingRequest(in_req) + AvailabilityDistributionMessage::ChunkFetchingRequest(in_req) ).boxed() ); } @@ -294,9 +294,9 @@ async fn overseer_recv( fn to_incoming_req( executor: &TaskExecutor, outgoing: Requests -) -> IncomingRequest { +) -> IncomingRequest { match outgoing { - Requests::AvailabilityFetching(OutgoingRequest { payload, pending_response, .. }) => { + Requests::ChunkFetching(OutgoingRequest { payload, pending_response, .. }) => { let (tx, rx): (oneshot::Sender, oneshot::Receiver<_>) = oneshot::channel(); executor.spawn("Message forwarding", async { diff --git a/polkadot/node/network/availability-recovery/Cargo.toml b/polkadot/node/network/availability-recovery/Cargo.toml index 78f0f37cda..21ae777bad 100644 --- a/polkadot/node/network/availability-recovery/Cargo.toml +++ b/polkadot/node/network/availability-recovery/Cargo.toml @@ -16,8 +16,7 @@ polkadot-primitives = { path = "../../../primitives" } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } polkadot-node-subsystem-util = { path = "../../subsystem-util" } polkadot-node-network-protocol = { path = "../../network/protocol" } -futures-timer = "3.0.2" -streamunordered = "0.5.1" +parity-scale-codec = { version = "2.0.0", default-features = false, features = ["derive"] } [dev-dependencies] assert_matches = "1.4.0" @@ -29,5 +28,6 @@ smallvec = "1.5.1" sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" } +sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } polkadot-subsystem-testhelpers = { package = "polkadot-node-subsystem-test-helpers", path = "../../subsystem-test-helpers" } diff --git a/polkadot/node/network/availability-recovery/src/error.rs b/polkadot/node/network/availability-recovery/src/error.rs index d33f496917..bf9a3125b3 100644 --- a/polkadot/node/network/availability-recovery/src/error.rs +++ b/polkadot/node/network/availability-recovery/src/error.rs @@ -25,9 +25,6 @@ pub enum Error { #[error(transparent)] Subsystem(#[from] polkadot_subsystem::SubsystemError), - #[error("failed to query a chunk from store")] - CanceledQueryChunk(#[source] oneshot::Canceled), - #[error("failed to query full data from store")] CanceledQueryFullData(#[source] oneshot::Canceled), diff --git a/polkadot/node/network/availability-recovery/src/lib.rs b/polkadot/node/network/availability-recovery/src/lib.rs index 21ebe392ac..3470e2c3cd 100644 --- a/polkadot/node/network/availability-recovery/src/lib.rs +++ b/polkadot/node/network/availability-recovery/src/lib.rs @@ -19,14 +19,11 @@ #![warn(missing_docs)] use std::collections::HashMap; -use std::time::Duration; -use std::pin::Pin; use futures::{channel::{oneshot, mpsc}, prelude::*, stream::FuturesUnordered}; -use futures_timer::Delay; +use futures::future::BoxFuture; use lru::LruCache; use rand::seq::SliceRandom; -use streamunordered::{StreamUnordered, StreamYield}; use polkadot_primitives::v1::{ AuthorityDiscoveryId, AvailableData, CandidateReceipt, CandidateHash, @@ -40,16 +37,16 @@ use polkadot_subsystem::{ jaeger, messages::{ AvailabilityStoreMessage, AvailabilityRecoveryMessage, AllMessages, NetworkBridgeMessage, - NetworkBridgeEvent, }, }; use polkadot_node_network_protocol::{ - peer_set::PeerSet, v1 as protocol_v1, PeerId, RequestId, UnifiedReputationChange as Rep, -}; -use polkadot_node_subsystem_util::{ - Timeout, TimeoutExt, - request_session_info_ctx, + IfDisconnected, + request_response::{ + self as req_res, OutgoingRequest, Recipient, Requests, + request::RequestError, + }, }; +use polkadot_node_subsystem_util::request_session_info_ctx; use polkadot_erasure_coding::{branches, branch_hash, recovery_threshold, obtain_chunks_v1}; mod error; @@ -58,78 +55,17 @@ mod tests; const LOG_TARGET: &str = "parachain::availability-recovery"; -const COST_MERKLE_PROOF_INVALID: Rep = Rep::CostMinor("Merkle proof was invalid"); -const COST_UNEXPECTED_CHUNK: Rep = Rep::CostMinor("Peer has sent an unexpected chunk"); -const COST_INVALID_AVAILABLE_DATA: Rep = Rep::CostMinor("Peer provided invalid available data"); - // How many parallel requests interaction should have going at once. const N_PARALLEL: usize = 50; // Size of the LRU cache where we keep recovered data. const LRU_SIZE: usize = 16; -// A timeout for a chunk request. -#[cfg(not(test))] -const CHUNK_REQUEST_TIMEOUT: Duration = Duration::from_secs(3); - -#[cfg(test)] -const CHUNK_REQUEST_TIMEOUT: Duration = Duration::from_millis(100); - -// A timeout for a full data request. -#[cfg(not(test))] -const FULL_DATA_REQUEST_TIMEOUT: Duration = Duration::from_secs(5); - -#[cfg(test)] -const FULL_DATA_REQUEST_TIMEOUT: Duration = Duration::from_millis(100); - -// A period to poll and clean awaited data. -const AWAITED_CLEANUP_INTERVAL: Duration = Duration::from_secs(1); - /// The Availability Recovery Subsystem. pub struct AvailabilityRecoverySubsystem { fast_path: bool, } -type DataResponse = (PeerId, ValidatorIndex, T); - -/// Awaited data from the network. -enum Awaited { - Chunk(AwaitedData), - FullData(AwaitedData), -} - -impl Awaited { - fn is_canceled(&self) -> bool { - match *self { - Awaited::Chunk(ref c) => c.response.is_canceled(), - Awaited::FullData(ref fd) => fd.response.is_canceled(), - } - } - - /// Token to cancel the connection request to the validator. - fn token(&self) -> usize { - match *self { - Awaited::Chunk(ref c) => c.token, - Awaited::FullData(ref fd) => fd.token, - } - } -} - -/// Data we keep around for network data that we are awaiting. -struct AwaitedData { - /// Index of the validator we have requested this chunk from. - validator_index: ValidatorIndex, - - /// The hash of the candidate the chunks belongs to. - candidate_hash: CandidateHash, - - /// Token to cancel the connection request to the validator. - token: usize, - - /// Result sender. - response: oneshot::Sender>, -} - /// Accumulate all awaiting sides for some particular `AvailableData`. struct InteractionHandle { awaiting: Vec>>, @@ -141,27 +77,8 @@ enum FromInteraction { /// An interaction concluded. Concluded(CandidateHash, Result), - /// Make a request of a particular chunk from a particular validator. - MakeChunkRequest( - AuthorityDiscoveryId, - CandidateHash, - ValidatorIndex, - oneshot::Sender>, - ), - - /// Make a request of the full data from a particular validator. - MakeFullDataRequest( - AuthorityDiscoveryId, - CandidateHash, - ValidatorIndex, - oneshot::Sender>, - ), - - /// Report a peer. - ReportPeer( - PeerId, - Rep, - ), + /// Send a request on the network service. + NetworkRequest(Requests), } struct RequestFromBackersPhase { @@ -175,7 +92,10 @@ struct RequestChunksPhase { // request the chunk from them. shuffling: Vec, received_chunks: HashMap, - requesting_chunks: FuturesUnordered>>>, + requesting_chunks: FuturesUnordered, RequestError>>, + >, } struct InteractionParams { @@ -241,18 +161,16 @@ impl RequestFromBackersPhase { Some(i) => i, }; - let (tx, rx) = oneshot::channel(); - // Request data. - to_state.send(FromInteraction::MakeFullDataRequest( - params.validator_authority_keys[validator_index.0 as usize].clone(), - params.candidate_hash.clone(), - validator_index, - tx, - )).await?; + let (req, res) = OutgoingRequest::new( + Recipient::Authority(params.validator_authority_keys[validator_index.0 as usize].clone()), + req_res::v1::AvailableDataFetchingRequest { candidate_hash: params.candidate_hash }, + ); - match rx.timeout(FULL_DATA_REQUEST_TIMEOUT).await { - Some(Ok((peer_id, _validator_index, data))) => { + to_state.send(FromInteraction::NetworkRequest(Requests::AvailableDataFetching(req))).await?; + + match res.await { + Ok(req_res::v1::AvailableDataFetchingResponse::AvailableData(data)) => { if reconstructed_data_matches_root(params.validators.len(), ¶ms.erasure_root, &data) { to_state.send( FromInteraction::Concluded(params.candidate_hash.clone(), Ok(data)) @@ -268,30 +186,21 @@ impl RequestFromBackersPhase { tracing::debug!( target: LOG_TARGET, candidate_hash = ?params.candidate_hash, - validator = ?peer_id, + ?validator_index, "Invalid data response", ); - to_state.send(FromInteraction::ReportPeer( - peer_id.clone(), - COST_INVALID_AVAILABLE_DATA, - )).await?; + + // it doesn't help to report the peer with req/res. } } - Some(Err(e)) => { - tracing::debug!( - target: LOG_TARGET, - err = ?e, - validator = ?params.validator_authority_keys[validator_index.0 as usize], - "A response channel was cancelled while waiting for full data", - ); - } - None => { - tracing::debug!( - target: LOG_TARGET, - validator = ?params.validator_authority_keys[validator_index.0 as usize], - "A full data request has timed out", - ); - } + Ok(req_res::v1::AvailableDataFetchingResponse::NoSuchData) => {} + Err(e) => tracing::debug!( + target: LOG_TARGET, + candidate_hash = ?params.candidate_hash, + ?validator_index, + err = ?e, + "Error fetching full available data." + ), } } } @@ -314,24 +223,39 @@ impl RequestChunksPhase { params: &InteractionParams, to_state: &mut mpsc::Sender, ) -> Result<(), mpsc::SendError> { - while self.requesting_chunks.len() < N_PARALLEL { + let max_requests = std::cmp::min(N_PARALLEL, params.threshold); + while self.requesting_chunks.len() < max_requests { if let Some(validator_index) = self.shuffling.pop() { - let (tx, rx) = oneshot::channel(); + let validator = params.validator_authority_keys[validator_index.0 as usize].clone(); tracing::trace!( target: LOG_TARGET, - validator = ?params.validator_authority_keys[validator_index.0 as usize], + ?validator, ?validator_index, candidate_hash = ?params.candidate_hash, "Requesting chunk", ); - to_state.send(FromInteraction::MakeChunkRequest( - params.validator_authority_keys[validator_index.0 as usize].clone(), - params.candidate_hash.clone(), - validator_index, - tx, - )).await?; - self.requesting_chunks.push(rx.timeout(CHUNK_REQUEST_TIMEOUT)); + // Request data. + let raw_request = req_res::v1::ChunkFetchingRequest { + candidate_hash: params.candidate_hash, + index: validator_index, + }; + + let (req, res) = OutgoingRequest::new( + Recipient::Authority(validator), + raw_request.clone(), + ); + + to_state.send(FromInteraction::NetworkRequest(Requests::ChunkFetching(req))).await?; + + self.requesting_chunks.push(Box::pin(async move { + match res.await { + Ok(req_res::v1::ChunkFetchingResponse::Chunk(chunk)) + => Ok(Some(chunk.recombine_into_chunk(&raw_request))), + Ok(req_res::v1::ChunkFetchingResponse::NoSuchChunk) => Ok(None), + Err(e) => Err(e), + } + })); } else { break; } @@ -343,7 +267,6 @@ impl RequestChunksPhase { async fn wait_for_chunks( &mut self, params: &InteractionParams, - to_state: &mut mpsc::Sender, ) -> Result<(), mpsc::SendError> { // Check if the requesting chunks is not empty not to poll to completion. if self.requesting_chunks.is_empty() { @@ -353,28 +276,11 @@ impl RequestChunksPhase { // Poll for new updates from requesting_chunks. while let Some(request_result) = self.requesting_chunks.next().await { match request_result { - Some(Ok((peer_id, validator_index, chunk))) => { + Ok(Some(chunk)) => { // Check merkle proofs of any received chunks, and any failures should // lead to issuance of a FromInteraction::ReportPeer message. - // We need to check that the validator index matches the chunk index and - // not blindly trust the data from an untrusted peer. - if validator_index != chunk.index { - tracing::debug!( - target: LOG_TARGET, - validator = ?peer_id, - ?validator_index, - chunk_index = ?chunk.index, - "Index mismatch", - ); - to_state.send(FromInteraction::ReportPeer( - peer_id.clone(), - COST_MERKLE_PROOF_INVALID, - )).await?; - - continue; - } - + let validator_index = chunk.index; if let Ok(anticipated_hash) = branch_hash( ¶ms.erasure_root, @@ -386,47 +292,31 @@ impl RequestChunksPhase { if erasure_chunk_hash != anticipated_hash { tracing::debug!( target: LOG_TARGET, - validator = ?peer_id, ?validator_index, "Merkle proof mismatch", ); - to_state.send(FromInteraction::ReportPeer( - peer_id.clone(), - COST_MERKLE_PROOF_INVALID, - )).await?; } else { tracing::trace!( target: LOG_TARGET, - validator = ?peer_id, ?validator_index, - "Received valid Merkle proof", + "Received valid chunk.", ); self.received_chunks.insert(validator_index, chunk); } } else { tracing::debug!( target: LOG_TARGET, - validator = ?peer_id, ?validator_index, "Invalid Merkle proof", ); - to_state.send(FromInteraction::ReportPeer( - peer_id.clone(), - COST_MERKLE_PROOF_INVALID, - )).await?; } } - Some(Err(e)) => { + Ok(None) => {} + Err(e) => { tracing::debug!( target: LOG_TARGET, err = ?e, - "A response channel was cancelled while waiting for a chunk", - ); - } - None => { - tracing::debug!( - target: LOG_TARGET, - "A chunk request has timed out", + "Failure requesting chunk", ); } } @@ -453,7 +343,7 @@ impl RequestChunksPhase { erasure_root = ?params.erasure_root, received = %self.received_chunks.len(), requesting = %self.requesting_chunks.len(), - n_validators = %self.shuffling.len(), + n_validators = %params.validators.len(), "Data recovery is not possible", ); to_state.send(FromInteraction::Concluded( @@ -465,7 +355,7 @@ impl RequestChunksPhase { } self.launch_parallel_requests(params, to_state).await?; - self.wait_for_chunks(params, to_state).await?; + self.wait_for_chunks(params).await?; // If received_chunks has more than threshold entries, attempt to recover the data. // If that fails, or a re-encoding of it doesn't match the expected erasure root, @@ -523,10 +413,10 @@ impl RequestChunksPhase { const fn is_unavailable( received_chunks: usize, requesting_chunks: usize, - n_validators: usize, + unrequested_validators: usize, threshold: usize, ) -> bool { - received_chunks + requesting_chunks + n_validators < threshold + received_chunks + requesting_chunks + unrequested_validators < threshold } fn reconstructed_data_matches_root( @@ -585,19 +475,6 @@ struct State { /// A recent block hash for which state should be available. live_block_hash: Hash, - /// We are waiting for these validators to connect and as soon as they - /// do, request the needed data we are waiting for. - discovering_validators: HashMap>, - - /// Requests that we have issued to the already connected validators - /// about the data we are interested in. - live_requests: HashMap, - - /// Derive request ids from this. - next_request_id: RequestId, - - connecting_validators: StreamUnordered>, - /// interaction communication. This is cloned and given to interactions that are spun up. from_interaction_tx: mpsc::Sender, @@ -613,14 +490,10 @@ impl Default for State { let (from_interaction_tx, from_interaction_rx) = mpsc::channel(16); Self { - from_interaction_tx, - from_interaction_rx, interactions: HashMap::new(), live_block_hash: Hash::default(), - discovering_validators: HashMap::new(), - live_requests: HashMap::new(), - next_request_id: 0, - connecting_validators: StreamUnordered::new(), + from_interaction_tx, + from_interaction_rx, availability_lru: LruCache::new(LRU_SIZE), } } @@ -659,15 +532,6 @@ async fn handle_signal( } } -/// Report a reputation change for a peer. -async fn report_peer( - ctx: &mut impl SubsystemContext, - peer: PeerId, - rep: Rep, -) { - ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::ReportPeer(peer, rep))).await; -} - /// Machinery around launching interactions into the background. #[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))] async fn launch_interaction( @@ -797,21 +661,6 @@ async fn handle_recover( } } -/// Queries a chunk from av-store. -#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))] -async fn query_chunk( - ctx: &mut impl SubsystemContext, - candidate_hash: CandidateHash, - validator_index: ValidatorIndex, -) -> error::Result> { - let (tx, rx) = oneshot::channel(); - ctx.send_message(AllMessages::AvailabilityStore( - AvailabilityStoreMessage::QueryChunk(candidate_hash, validator_index, tx), - )).await; - - Ok(rx.await.map_err(error::Error::CanceledQueryChunk)?) -} - /// Queries a chunk from av-store. #[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))] async fn query_full_data( @@ -857,336 +706,17 @@ async fn handle_from_interaction( state.availability_lru.put(candidate_hash, result); } - FromInteraction::MakeChunkRequest(id, candidate_hash, validator_index, response) => { - let (tx, rx) = mpsc::channel(2); - - let message = NetworkBridgeMessage::ConnectToValidators { - validator_ids: vec![id.clone()], - peer_set: PeerSet::Validation, - connected: tx, - }; - - ctx.send_message(AllMessages::NetworkBridge(message)).await; - - let token = state.connecting_validators.push(rx); - - state.discovering_validators.entry(id).or_default().push(Awaited::Chunk(AwaitedData { - validator_index, - candidate_hash, - token, - response, - })); - } - FromInteraction::MakeFullDataRequest(id, candidate_hash, validator_index, response) => { - let (tx, rx) = mpsc::channel(2); - - let message = NetworkBridgeMessage::ConnectToValidators { - validator_ids: vec![id.clone()], - peer_set: PeerSet::Validation, - connected: tx, - }; - - ctx.send_message(AllMessages::NetworkBridge(message)).await; - - let token = state.connecting_validators.push(rx); - - state.discovering_validators.entry(id).or_default().push(Awaited::FullData(AwaitedData { - validator_index, - candidate_hash, - token, - response, - })); - } - FromInteraction::ReportPeer(peer_id, rep) => { - report_peer(ctx, peer_id, rep).await; + FromInteraction::NetworkRequest(request) => { + ctx.send_message(NetworkBridgeMessage::SendRequests( + vec![request], + IfDisconnected::TryConnect, + ).into()).await; } } Ok(()) } -/// Handles a network bridge update. -#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))] -async fn handle_network_update( - state: &mut State, - ctx: &mut impl SubsystemContext, - update: NetworkBridgeEvent, -) -> error::Result<()> { - match update { - NetworkBridgeEvent::PeerMessage(peer, message) => { - match message { - protocol_v1::AvailabilityRecoveryMessage::RequestChunk( - request_id, - candidate_hash, - validator_index, - ) => { - // Issue a - // AvailabilityStore::QueryChunk(candidate-hash, validator_index, response) - // message. - let chunk = query_chunk(ctx, candidate_hash, validator_index).await?; - - tracing::trace!( - target: LOG_TARGET, - data_set = %chunk.is_some(), - %request_id, - ?candidate_hash, - validator_index = validator_index.0, - "Responding to chunk request", - ); - - // Whatever the result, issue an - // AvailabilityRecoveryV1Message::Chunk(r_id, response) message. - let wire_message = protocol_v1::AvailabilityRecoveryMessage::Chunk( - request_id, - chunk, - ); - - ctx.send_message(AllMessages::NetworkBridge( - NetworkBridgeMessage::SendValidationMessage( - vec![peer], - protocol_v1::ValidationProtocol::AvailabilityRecovery(wire_message), - ), - )).await; - } - protocol_v1::AvailabilityRecoveryMessage::Chunk(request_id, chunk) => { - match state.live_requests.remove(&request_id) { - None => { - tracing::debug!( - target: LOG_TARGET, - ?peer, - "Received unexpected chunk response", - ); - // If there doesn't exist one, report the peer and return. - report_peer(ctx, peer, COST_UNEXPECTED_CHUNK).await; - } - Some((peer_id, Awaited::Chunk(awaited_chunk))) if peer_id == peer => { - tracing::trace!( - target: LOG_TARGET, - data_set = %chunk.is_some(), - %request_id, - candidate_hash = ?awaited_chunk.candidate_hash, - validator_index = awaited_chunk.validator_index.0, - "Received chunk response", - ); - - // If there exists an entry under r_id, remove it. - // Send the chunk response on the awaited_chunk for the interaction to handle. - if let Some(chunk) = chunk { - if awaited_chunk.response.send( - (peer_id, awaited_chunk.validator_index, chunk) - ).is_err() { - tracing::debug!( - target: LOG_TARGET, - "A sending side of the recovery request is closed", - ); - } - } - } - Some(a) => { - tracing::debug!( - target: LOG_TARGET, - ?peer, - "Received unexpected chunk response", - ); - // If the peer in the entry doesn't match the sending peer, - // reinstate the entry, report the peer, and return - state.live_requests.insert(request_id, a); - report_peer(ctx, peer, COST_UNEXPECTED_CHUNK).await; - } - } - } - protocol_v1::AvailabilityRecoveryMessage::RequestFullData( - request_id, - candidate_hash, - ) => { - // Issue a - // AvailabilityStore::QueryAvailableData(candidate-hash, response) - // message. - let full_data = query_full_data(ctx, candidate_hash).await?; - - tracing::trace!( - target: LOG_TARGET, - data_set = full_data.is_some(), - %request_id, - ?candidate_hash, - "Responding to full data request", - ); - - // Whatever the result, issue an - // AvailabilityRecoveryV1Message::FullData(r_id, response) message. - let wire_message = protocol_v1::AvailabilityRecoveryMessage::FullData( - request_id, - full_data, - ); - - ctx.send_message(AllMessages::NetworkBridge( - NetworkBridgeMessage::SendValidationMessage( - vec![peer], - protocol_v1::ValidationProtocol::AvailabilityRecovery(wire_message), - ), - )).await; - } - protocol_v1::AvailabilityRecoveryMessage::FullData(request_id, data) => { - match state.live_requests.remove(&request_id) { - None => { - // If there doesn't exist one, report the peer and return. - tracing::debug!( - target: LOG_TARGET, - ?peer, - "Received unexpected full data response", - ); - report_peer(ctx, peer, COST_UNEXPECTED_CHUNK).await; - } - Some((peer_id, Awaited::FullData(awaited))) if peer_id == peer => { - tracing::trace!( - target: LOG_TARGET, - %request_id, - candidate_hash = ?awaited.candidate_hash, - data_set = %data.is_some(), - "Received full data response", - ); - - // If there exists an entry under r_id, remove it. - // Send the response on the awaited for the interaction to handle. - if let Some(data) = data { - if awaited.response.send((peer_id, awaited.validator_index, data)).is_err() { - tracing::debug!( - target: LOG_TARGET, - "A sending side of the recovery request is closed", - ); - } - } - } - Some(a) => { - // If the peer in the entry doesn't match the sending peer, - // reinstate the entry, report the peer, and return - tracing::debug!( - target: LOG_TARGET, - ?peer, - "Received unexpected full data response", - ); - state.live_requests.insert(request_id, a); - report_peer(ctx, peer, COST_UNEXPECTED_CHUNK).await; - } - } - } - } - } - // We do not really need to track the peers' views in this subsystem - // since the peers are _required_ to have the data we are interested in. - NetworkBridgeEvent::PeerViewChange(_, _) => {} - NetworkBridgeEvent::OurViewChange(_) => {} - // All peer connections are handled via validator discovery API. - NetworkBridgeEvent::PeerConnected(_, _) => {} - NetworkBridgeEvent::PeerDisconnected(_) => {} - } - - Ok(()) -} - -/// Issues a request to the validator we've been waiting for to connect to us. -async fn issue_request( - state: &mut State, - ctx: &mut impl SubsystemContext, - peer_id: PeerId, - awaited: Awaited, -) -> error::Result<()> { - let request_id = state.next_request_id; - state.next_request_id += 1; - - let wire_message = match awaited { - Awaited::Chunk(ref awaited_chunk) => { - tracing::trace!( - target: LOG_TARGET, - %request_id, - %peer_id, - candidate_hash = ?awaited_chunk.candidate_hash, - validator_index = %awaited_chunk.validator_index.0, - "Requesting chunk", - ); - - protocol_v1::AvailabilityRecoveryMessage::RequestChunk( - request_id, - awaited_chunk.candidate_hash, - awaited_chunk.validator_index, - ) - } - Awaited::FullData(ref awaited_data) => { - tracing::trace!( - target: LOG_TARGET, - %request_id, - %peer_id, - candidate_hash = ?awaited_data.candidate_hash, - validator_index = %awaited_data.validator_index.0, - "Requesting full data", - ); - - protocol_v1::AvailabilityRecoveryMessage::RequestFullData( - request_id, - awaited_data.candidate_hash, - ) - } - }; - - - - ctx.send_message(AllMessages::NetworkBridge( - NetworkBridgeMessage::SendValidationMessage( - vec![peer_id.clone()], - protocol_v1::ValidationProtocol::AvailabilityRecovery(wire_message), - ), - )).await; - - state.live_requests.insert(request_id, (peer_id, awaited)); - - Ok(()) -} - -/// Handles a newly connected validator in the context of some relay leaf. -async fn handle_validator_connected( - state: &mut State, - ctx: &mut impl SubsystemContext, - authority_id: AuthorityDiscoveryId, - peer_id: PeerId, -) -> error::Result<()> { - tracing::trace!( - target: LOG_TARGET, - ?peer_id, - ?authority_id, - "Validator connected", - ); - if let Some(discovering) = state.discovering_validators.remove(&authority_id) { - for awaited in discovering { - issue_request(state, ctx, peer_id.clone(), awaited).await?; - } - } - - Ok(()) -} - -/// Awaited info that `State` holds has to be cleaned up -/// periodically since there is no way `Interaction` can communicate -/// a timedout request. -fn cleanup_awaited(state: &mut State) { - let mut removed_tokens = Vec::new(); - - for (_, v) in state.discovering_validators.iter_mut() { - v.retain(|e| if e.is_canceled() { - removed_tokens.push(e.token()); - false - } else { - true - }); - } - - for token in removed_tokens { - Pin::new(&mut state.connecting_validators).remove(token); - } - - state.discovering_validators.retain(|_, v| !v.is_empty()); - state.live_requests.retain(|_, v| !v.1.is_canceled()); -} - impl AvailabilityRecoverySubsystem { /// Create a new instance of `AvailabilityRecoverySubsystem` which starts with a fast path to request data from backers. pub fn with_fast_path() -> Self { @@ -1204,40 +734,8 @@ impl AvailabilityRecoverySubsystem { ) -> SubsystemResult<()> { let mut state = State::default(); - let awaited_cleanup_interval = futures::stream::repeat(()).then(|_| async move { - Delay::new(AWAITED_CLEANUP_INTERVAL).await; - }); - - futures::pin_mut!(awaited_cleanup_interval); - loop { - futures::select_biased! { - _v = awaited_cleanup_interval.next() => { - cleanup_awaited(&mut state); - } - v = state.connecting_validators.next() => { - if let Some((v, token)) = v { - match v { - StreamYield::Item(v) => { - if let Err(e) = handle_validator_connected( - &mut state, - &mut ctx, - v.0, - v.1, - ).await { - tracing::warn!( - target: LOG_TARGET, - err = ?e, - "Failed to handle a newly connected validator", - ); - } - } - StreamYield::Finished(_) => { - Pin::new(&mut state.connecting_validators).remove(token); - } - } - } - } + futures::select! { v = ctx.recv().fuse() => { match v? { FromOverseer::Signal(signal) => if handle_signal( @@ -1269,17 +767,20 @@ impl AvailabilityRecoverySubsystem { ); } } - AvailabilityRecoveryMessage::NetworkBridgeUpdateV1(event) => { - if let Err(e) = handle_network_update( - &mut state, - &mut ctx, - event, - ).await { - tracing::warn!( - target: LOG_TARGET, - err = ?e, - "Error handling a network bridge update", - ); + AvailabilityRecoveryMessage::AvailableDataFetchingRequest(req) => { + match query_full_data(&mut ctx, req.payload.candidate_hash).await { + Ok(res) => { + let _ = req.send_response(res.into()); + } + Err(e) => { + tracing::debug!( + target: LOG_TARGET, + err = ?e, + "Failed to query available data.", + ); + + let _ = req.send_response(None.into()); + } } } } diff --git a/polkadot/node/network/availability-recovery/src/tests.rs b/polkadot/node/network/availability-recovery/src/tests.rs index bbe63dc8a0..2dda6cbe59 100644 --- a/polkadot/node/network/availability-recovery/src/tests.rs +++ b/polkadot/node/network/availability-recovery/src/tests.rs @@ -22,6 +22,8 @@ use futures_timer::Delay; use assert_matches::assert_matches; use smallvec::smallvec; +use parity_scale_codec::Encode; + use super::*; use polkadot_primitives::v1::{ @@ -30,7 +32,7 @@ use polkadot_primitives::v1::{ use polkadot_erasure_coding::{branches, obtain_chunks_v1 as obtain_chunks}; use polkadot_node_subsystem_util::TimeoutExt; use polkadot_subsystem_testhelpers as test_helpers; -use polkadot_subsystem::{messages::{RuntimeApiMessage, RuntimeApiRequest, NetworkBridgeEvent}, jaeger}; +use polkadot_subsystem::{messages::{RuntimeApiMessage, RuntimeApiRequest}, jaeger}; type VirtualOverseer = test_helpers::TestSubsystemContextHandle; @@ -139,11 +141,10 @@ async fn overseer_recv( use sp_keyring::Sr25519Keyring; #[derive(Debug, Clone)] -enum HasAvailableData { +enum Has { No, Yes, Timeout, - Other(AvailableData), } #[derive(Clone)] @@ -151,12 +152,10 @@ struct TestState { validators: Vec, validator_public: Vec, validator_authority_id: Vec, - validator_peer_id: Vec, current: Hash, candidate: CandidateReceipt, session_index: SessionIndex, - persisted_validation_data: PersistedValidationData, available_data: AvailableData, @@ -164,6 +163,26 @@ struct TestState { } impl TestState { + fn threshold(&self) -> usize { + recovery_threshold(self.validators.len()).unwrap() + } + + fn impossibility_threshold(&self) -> usize { + self.validators.len() - self.threshold() + 1 + } + + fn all_have(&self) -> Vec { + (0..self.validators.len()).map(|_| Has::Yes).collect() + } + + fn all_dont_have(&self) -> Vec { + (0..self.validators.len()).map(|_| Has::Yes).collect() + } + + fn all_timeout(&self) -> Vec { + (0..self.validators.len()).map(|_| Has::Timeout).collect() + } + async fn test_runtime_api( &self, virtual_overseer: &mut VirtualOverseer, @@ -191,238 +210,104 @@ impl TestState { ); } - async fn test_connect_to_all_validators( - &self, - virtual_overseer: &mut VirtualOverseer, - ) { - self.test_connect_to_validators(virtual_overseer, self.validator_public.len()).await; - } - - async fn test_connect_to_validators( - &self, - virtual_overseer: &mut VirtualOverseer, - n: usize, - ) { - // Channels by AuthorityDiscoveryId to send results to. - // Gather them here and send in batch after the loop not to race. - let mut results = HashMap::new(); - - for _ in 0..n { - // Connect to shuffled validators one by one. - assert_matches!( - overseer_recv(virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ConnectToValidators { - validator_ids, - connected, - .. - } - ) => { - for validator_id in validator_ids { - let idx = self.validator_authority_id - .iter() - .position(|x| *x == validator_id) - .unwrap(); - - results.insert( - ( - self.validator_authority_id[idx].clone(), - self.validator_peer_id[idx].clone(), - ), - connected.clone(), - ); - } - } - ); - } - - for (k, mut v) in results.into_iter() { - v.send(k).await.unwrap(); - } - } - async fn test_chunk_requests( &self, candidate_hash: CandidateHash, virtual_overseer: &mut VirtualOverseer, + n: usize, + who_has: &[Has], ) { - for _ in 0..self.validator_public.len() { + // arbitrary order. + for _ in 0..n { // Receive a request for a chunk. assert_matches!( overseer_recv(virtual_overseer).await, AllMessages::NetworkBridge( - NetworkBridgeMessage::SendValidationMessage( - _peers, - protocol_v1::ValidationProtocol::AvailabilityRecovery(wire_message), + NetworkBridgeMessage::SendRequests( + mut requests, + IfDisconnected::TryConnect, ) ) => { - let (request_id, validator_index) = assert_matches!( - wire_message, - protocol_v1::AvailabilityRecoveryMessage::RequestChunk( - request_id, - candidate_hash_recvd, - validator_index, - ) => { - assert_eq!(candidate_hash_recvd, candidate_hash); - (request_id, validator_index) - } - ); + assert_eq!(requests.len(), 1); - overseer_send( - virtual_overseer, - AvailabilityRecoveryMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage( - self.validator_peer_id[validator_index.0 as usize].clone(), - protocol_v1::AvailabilityRecoveryMessage::Chunk( - request_id, - Some(self.chunks[validator_index.0 as usize].clone()), + assert_matches!( + requests.pop().unwrap(), + Requests::ChunkFetching(req) => { + assert_eq!(req.payload.candidate_hash, candidate_hash); + + let validator_index = req.payload.index.0 as usize; + let available_data = match who_has[validator_index] { + Has::No => Ok(None), + Has::Yes => Ok(Some(self.chunks[validator_index].clone().into())), + Has::Timeout => { + Err(sc_network::RequestFailure::Network( + sc_network::OutboundFailure::Timeout + )) + } + }; + + let _ = req.pending_response.send( + available_data.map(|r| + req_res::v1::ChunkFetchingResponse::from(r).encode() ) - ) - ) - ).await; - } - ); - } - } - - async fn test_faulty_chunk_requests( - &self, - candidate_hash: CandidateHash, - virtual_overseer: &mut VirtualOverseer, - faulty: &[bool], - ) { - for _ in 0..self.validator_public.len() { - // Receive a request for a chunk. - assert_matches!( - overseer_recv(virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::SendValidationMessage( - _peers, - protocol_v1::ValidationProtocol::AvailabilityRecovery(wire_message), + ); + } ) - ) => { - let (request_id, validator_index) = assert_matches!( - wire_message, - protocol_v1::AvailabilityRecoveryMessage::RequestChunk( - request_id, - candidate_hash_recvd, - validator_index, - ) => { - assert_eq!(candidate_hash_recvd, candidate_hash); - (request_id, validator_index) - } - ); - - overseer_send( - virtual_overseer, - AvailabilityRecoveryMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage( - self.validator_peer_id[validator_index.0 as usize].clone(), - protocol_v1::AvailabilityRecoveryMessage::Chunk( - request_id, - Some(self.chunks[validator_index.0 as usize].clone()), - ) - ) - ) - ).await; } ); } - - for i in 0..self.validator_public.len() { - if faulty[i] { - assert_matches!( - overseer_recv(virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ReportPeer( - peer, - rep, - ) - ) => { - assert_eq!(rep, COST_MERKLE_PROOF_INVALID); - - // These may arrive in any order since the interaction implementation - // uses `FuturesUnordered`. - assert!(self.validator_peer_id.iter().find(|p| **p == peer).is_some()); - } - ); - } - } } async fn test_full_data_requests( &self, candidate_hash: CandidateHash, virtual_overseer: &mut VirtualOverseer, - who_has: &[HasAvailableData], + who_has: &[Has], ) { - for _ in 0..self.validator_public.len() { - self.test_connect_to_validators(virtual_overseer, 1).await; - + for _ in 0..self.validators.len() { // Receive a request for a chunk. assert_matches!( overseer_recv(virtual_overseer).await, AllMessages::NetworkBridge( - NetworkBridgeMessage::SendValidationMessage( - peers, - protocol_v1::ValidationProtocol::AvailabilityRecovery(wire_message), + NetworkBridgeMessage::SendRequests( + mut requests, + IfDisconnected::TryConnect, ) ) => { - let (request_id, validator_index) = assert_matches!( - wire_message, - protocol_v1::AvailabilityRecoveryMessage::RequestFullData( - request_id, - candidate_hash_recvd, - ) => { - assert_eq!(candidate_hash_recvd, candidate_hash); - assert_eq!(peers.len(), 1); + assert_eq!(requests.len(), 1); - let validator_index = self.validator_peer_id.iter().position(|p| p == &peers[0]).unwrap(); - (request_id, validator_index) - } - ); + assert_matches!( + requests.pop().unwrap(), + Requests::AvailableDataFetching(req) => { + assert_eq!(req.payload.candidate_hash, candidate_hash); + let validator_index = self.validator_authority_id + .iter() + .position(|a| Recipient::Authority(a.clone()) == req.peer) + .unwrap(); - let available_data = match who_has[validator_index] { - HasAvailableData::No => Some(None), - HasAvailableData::Yes => Some(Some(self.available_data.clone())), - HasAvailableData::Timeout => None, - HasAvailableData::Other(ref other) => Some(Some(other.clone())), - }; - - if let Some(maybe_data) = available_data { - overseer_send( - virtual_overseer, - AvailabilityRecoveryMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage( - self.validator_peer_id[validator_index].clone(), - protocol_v1::AvailabilityRecoveryMessage::FullData( - request_id, - maybe_data, - ) - ) - ) - ).await; - } - - match who_has[validator_index] { - HasAvailableData::Yes => break, // done - HasAvailableData::No => {} - HasAvailableData::Timeout => { Delay::new(FULL_DATA_REQUEST_TIMEOUT).await } - HasAvailableData::Other(_) => { - assert_matches!( - overseer_recv(virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ReportPeer( - p, - rep, - ) - ) => { - assert_eq!(p, self.validator_peer_id[validator_index]); - assert_eq!(rep, COST_INVALID_AVAILABLE_DATA); + let available_data = match who_has[validator_index] { + Has::No => Ok(None), + Has::Yes => Ok(Some(self.available_data.clone())), + Has::Timeout => { + Err(sc_network::RequestFailure::Network( + sc_network::OutboundFailure::Timeout + )) } + }; + + let _ = req.pending_response.send( + available_data.map(|r| + req_res::v1::AvailableDataFetchingResponse::from(r).encode() + ) ); + + match who_has[validator_index].clone() { + Has::Yes => break, // done + Has::No => {} + Has::Timeout => {} + } } - } + ) } ); } @@ -477,9 +362,6 @@ impl Default for TestState { let validator_public = validator_pubkeys(&validators); let validator_authority_id = validator_authority_id(&validators); - let validator_peer_id = std::iter::repeat_with(|| PeerId::random()) - .take(validator_public.len()) - .collect(); let current = Hash::repeat_byte(1); @@ -516,7 +398,6 @@ impl Default for TestState { validators, validator_public, validator_authority_id, - validator_peer_id, current, candidate, session_index, @@ -556,11 +437,14 @@ fn availability_is_recovered_from_chunks_if_no_group_provided() { test_state.test_runtime_api(&mut virtual_overseer).await; - test_state.test_connect_to_all_validators(&mut virtual_overseer).await; - let candidate_hash = test_state.candidate.hash(); - test_state.test_chunk_requests(candidate_hash, &mut virtual_overseer).await; + test_state.test_chunk_requests( + candidate_hash, + &mut virtual_overseer, + test_state.threshold(), + &test_state.all_have(), + ).await; // Recovered data should match the original one. assert_eq!(rx.await.unwrap().unwrap(), test_state.available_data); @@ -575,7 +459,7 @@ fn availability_is_recovered_from_chunks_if_no_group_provided() { overseer_send( &mut virtual_overseer, AvailabilityRecoveryMessage::RecoverAvailableData( - new_candidate, + new_candidate.clone(), test_state.session_index, None, tx, @@ -584,7 +468,12 @@ fn availability_is_recovered_from_chunks_if_no_group_provided() { test_state.test_runtime_api(&mut virtual_overseer).await; - test_state.test_connect_to_all_validators(&mut virtual_overseer).await; + test_state.test_chunk_requests( + new_candidate.hash(), + &mut virtual_overseer, + test_state.impossibility_threshold(), + &test_state.all_dont_have(), + ).await; // A request times out with `Unavailable` error. assert_eq!(rx.await.unwrap().unwrap_err(), RecoveryError::Unavailable); @@ -619,11 +508,15 @@ fn availability_is_recovered_from_chunks_even_if_backing_group_supplied_if_chunk ).await; test_state.test_runtime_api(&mut virtual_overseer).await; - test_state.test_connect_to_all_validators(&mut virtual_overseer).await; let candidate_hash = test_state.candidate.hash(); - test_state.test_chunk_requests(candidate_hash, &mut virtual_overseer).await; + test_state.test_chunk_requests( + candidate_hash, + &mut virtual_overseer, + test_state.threshold(), + &test_state.all_have(), + ).await; // Recovered data should match the original one. assert_eq!(rx.await.unwrap().unwrap(), test_state.available_data); @@ -638,7 +531,7 @@ fn availability_is_recovered_from_chunks_even_if_backing_group_supplied_if_chunk overseer_send( &mut virtual_overseer, AvailabilityRecoveryMessage::RecoverAvailableData( - new_candidate, + new_candidate.clone(), test_state.session_index, None, tx, @@ -647,7 +540,12 @@ fn availability_is_recovered_from_chunks_even_if_backing_group_supplied_if_chunk test_state.test_runtime_api(&mut virtual_overseer).await; - test_state.test_connect_to_all_validators(&mut virtual_overseer).await; + test_state.test_chunk_requests( + new_candidate.hash(), + &mut virtual_overseer, + test_state.impossibility_threshold(), + &test_state.all_dont_have(), + ).await; // A request times out with `Unavailable` error. assert_eq!(rx.await.unwrap().unwrap_err(), RecoveryError::Unavailable); @@ -682,7 +580,6 @@ fn bad_merkle_path_leads_to_recovery_error() { ).await; test_state.test_runtime_api(&mut virtual_overseer).await; - test_state.test_connect_to_all_validators(&mut virtual_overseer).await; let candidate_hash = test_state.candidate.hash(); @@ -691,17 +588,13 @@ fn bad_merkle_path_leads_to_recovery_error() { test_state.chunks[1].chunk = vec![1; 32]; test_state.chunks[2].chunk = vec![2; 32]; test_state.chunks[3].chunk = vec![3; 32]; + test_state.chunks[4].chunk = vec![4; 32]; - let mut faulty = vec![false; test_state.chunks.len()]; - faulty[0] = true; - faulty[1] = true; - faulty[2] = true; - faulty[3] = true; - - test_state.test_faulty_chunk_requests( + test_state.test_chunk_requests( candidate_hash, &mut virtual_overseer, - &faulty, + test_state.impossibility_threshold(), + &test_state.all_have(), ).await; // A request times out with `Unavailable` error. @@ -738,8 +631,6 @@ fn wrong_chunk_index_leads_to_recovery_error() { test_state.test_runtime_api(&mut virtual_overseer).await; - test_state.test_connect_to_all_validators(&mut virtual_overseer).await; - let candidate_hash = test_state.candidate.hash(); // These chunks should fail the index check as they don't have the correct index for validator. @@ -748,13 +639,14 @@ fn wrong_chunk_index_leads_to_recovery_error() { test_state.chunks[3] = test_state.chunks[0].clone(); test_state.chunks[4] = test_state.chunks[0].clone(); - let mut faulty = vec![true; test_state.chunks.len()]; - faulty[0] = false; + let mut have = test_state.all_dont_have(); + have[0] = Has::No; - test_state.test_faulty_chunk_requests( + test_state.test_chunk_requests( candidate_hash, &mut virtual_overseer, - &faulty, + test_state.impossibility_threshold(), + &have, ).await; // A request times out with `Unavailable` error as there are no good peers. @@ -809,14 +701,14 @@ fn invalid_erasure_coding_leads_to_invalid_error() { test_state.test_runtime_api(&mut virtual_overseer).await; - test_state.test_connect_to_all_validators(&mut virtual_overseer).await; - test_state.test_chunk_requests( candidate_hash, &mut virtual_overseer, + test_state.threshold(), + &test_state.all_have(), ).await; - // A request times out with `Unavailable` error as there are no good peers. + // f+1 'valid' chunks can't produce correct data. assert_eq!(rx.await.unwrap().unwrap_err(), RecoveryError::Invalid); }); } @@ -852,8 +744,8 @@ fn fast_path_backing_group_recovers() { let candidate_hash = test_state.candidate.hash(); - let mut who_has: Vec<_> = (0..test_state.validators.len()).map(|_| HasAvailableData::No).collect(); - who_has[3] = HasAvailableData::Yes; + let mut who_has = test_state.all_dont_have(); + who_has[3] = Has::Yes; test_state.test_full_data_requests( candidate_hash, @@ -866,51 +758,6 @@ fn fast_path_backing_group_recovers() { }); } -#[test] -fn wrong_data_from_fast_path_peer_leads_to_punishment() { - let test_state = TestState::default(); - - test_harness_fast_path(|test_harness| async move { - let TestHarness { mut virtual_overseer } = test_harness; - - overseer_signal( - &mut virtual_overseer, - OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { - activated: smallvec![(test_state.current.clone(), Arc::new(jaeger::Span::Disabled))], - deactivated: smallvec![], - }), - ).await; - - let (tx, _rx) = oneshot::channel(); - - overseer_send( - &mut virtual_overseer, - AvailabilityRecoveryMessage::RecoverAvailableData( - test_state.candidate.clone(), - test_state.session_index, - Some(GroupIndex(0)), - tx, - ) - ).await; - - test_state.test_runtime_api(&mut virtual_overseer).await; - - let candidate_hash = test_state.candidate.hash(); - - let mut a = test_state.available_data.clone(); - a.pov = Arc::new(PoV { block_data: BlockData(vec![69; 420]) }); - - let who_has: Vec<_> = (0..test_state.validators.len()).map(|_| HasAvailableData::Other(a.clone())).collect(); - - // This function implicitly punishes. - test_state.test_full_data_requests( - candidate_hash, - &mut virtual_overseer, - &who_has, - ).await; - }); -} - #[test] fn no_answers_in_fast_path_causes_chunk_requests() { let test_state = TestState::default(); @@ -943,9 +790,9 @@ fn no_answers_in_fast_path_causes_chunk_requests() { let candidate_hash = test_state.candidate.hash(); // mix of timeout and no. - let mut who_has: Vec<_> = (0..test_state.validators.len()).map(|_| HasAvailableData::Timeout).collect(); - who_has[0] = HasAvailableData::No; - who_has[3] = HasAvailableData::No; + let mut who_has = test_state.all_timeout(); + who_has[0] = Has::No; + who_has[3] = Has::No; test_state.test_full_data_requests( candidate_hash, @@ -953,8 +800,12 @@ fn no_answers_in_fast_path_causes_chunk_requests() { &who_has, ).await; - test_state.test_connect_to_all_validators(&mut virtual_overseer).await; - test_state.test_chunk_requests(candidate_hash, &mut virtual_overseer).await; + test_state.test_chunk_requests( + candidate_hash, + &mut virtual_overseer, + test_state.threshold(), + &test_state.all_have(), + ).await; // Recovered data should match the original one. assert_eq!(rx.await.unwrap().unwrap(), test_state.available_data); diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs index 9516ecd797..1c5b212165 100644 --- a/polkadot/node/network/bridge/src/lib.rs +++ b/polkadot/node/network/bridge/src/lib.rs @@ -686,7 +686,6 @@ mod tests { use polkadot_subsystem::{ActiveLeavesUpdate, FromOverseer, OverseerSignal}; use polkadot_subsystem::messages::{ - AvailabilityRecoveryMessage, ApprovalDistributionMessage, BitfieldDistributionMessage, PoVDistributionMessage, @@ -883,13 +882,6 @@ mod tests { ) if e == event.focus().expect("could not focus message") ); - assert_matches!( - virtual_overseer.recv().await, - AllMessages::AvailabilityRecovery( - AvailabilityRecoveryMessage::NetworkBridgeUpdateV1(e) - ) if e == event.focus().expect("could not focus message") - ); - assert_matches!( virtual_overseer.recv().await, AllMessages::BitfieldDistribution( @@ -1604,7 +1596,7 @@ mod tests { fn spread_event_to_subsystems_is_up_to_date() { // Number of subsystems expected to be interested in a network event, // and hence the network event broadcasted to. - const EXPECTED_COUNT: usize = 5; + const EXPECTED_COUNT: usize = 4; let mut cnt = 0_usize; for msg in AllMessages::dispatch_iter(NetworkBridgeEvent::PeerDisconnected(PeerId::random())) { @@ -1616,7 +1608,7 @@ mod tests { AllMessages::CollatorProtocol(_) => unreachable!("Not interested in network events"), AllMessages::StatementDistribution(_) => { cnt += 1; } AllMessages::AvailabilityDistribution(_) => unreachable!("Not interested in network events"), - AllMessages::AvailabilityRecovery(_) => { cnt += 1; } + AllMessages::AvailabilityRecovery(_) => unreachable!("Not interested in network events"), AllMessages::BitfieldDistribution(_) => { cnt += 1; } AllMessages::BitfieldSigning(_) => unreachable!("Not interested in network events"), AllMessages::Provisioner(_) => unreachable!("Not interested in network events"), diff --git a/polkadot/node/network/bridge/src/multiplexer.rs b/polkadot/node/network/bridge/src/multiplexer.rs index 3901a91078..7ff7187cf1 100644 --- a/polkadot/node/network/bridge/src/multiplexer.rs +++ b/polkadot/node/network/bridge/src/multiplexer.rs @@ -131,9 +131,9 @@ fn multiplex_single( }: network::IncomingRequest, ) -> Result { let r = match p { - Protocol::AvailabilityFetching => From::from(IncomingRequest::new( + Protocol::ChunkFetching => From::from(IncomingRequest::new( peer, - decode_with_peer::(peer, payload)?, + decode_with_peer::(peer, payload)?, pending_response, )), Protocol::CollationFetching => From::from(IncomingRequest::new( @@ -141,6 +141,11 @@ fn multiplex_single( decode_with_peer::(peer, payload)?, pending_response, )), + Protocol::AvailableDataFetching => From::from(IncomingRequest::new( + peer, + decode_with_peer::(peer, payload)?, + pending_response, + )), }; Ok(r) } diff --git a/polkadot/node/network/protocol/src/lib.rs b/polkadot/node/network/protocol/src/lib.rs index 664ff25bd2..ee5d3398e6 100644 --- a/polkadot/node/network/protocol/src/lib.rs +++ b/polkadot/node/network/protocol/src/lib.rs @@ -23,7 +23,7 @@ use polkadot_primitives::v1::{Hash, BlockNumber}; use parity_scale_codec::{Encode, Decode}; use std::{fmt, collections::HashMap}; -pub use sc_network::PeerId; +pub use sc_network::{PeerId, IfDisconnected}; #[doc(hidden)] pub use polkadot_node_jaeger as jaeger; #[doc(hidden)] @@ -38,9 +38,6 @@ pub mod peer_set; /// Request/response protocols used in Polkadot. pub mod request_response; -/// A unique identifier of a request. -pub type RequestId = u64; - /// A version of the protocol. pub type ProtocolVersion = u32; @@ -288,35 +285,17 @@ impl View { /// v1 protocol types. pub mod v1 { + use parity_scale_codec::{Encode, Decode}; use std::convert::TryFrom; - use parity_scale_codec::{Decode, Encode}; - - use super::RequestId; + use polkadot_primitives::v1::{ + CandidateIndex, CollatorId, CompressedPoV, Hash, Id as ParaId, SignedAvailabilityBitfield, + CollatorSignature, + }; use polkadot_node_primitives::{ approval::{IndirectAssignmentCert, IndirectSignedApprovalVote}, SignedFullStatement, }; - use polkadot_primitives::v1::{ - AvailableData, CandidateHash, CandidateIndex, CollatorId, CompressedPoV, - CollatorSignature, ErasureChunk, Hash, Id as ParaId, SignedAvailabilityBitfield, - ValidatorIndex, - }; - - /// Network messages used by the availability recovery subsystem. - #[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)] - pub enum AvailabilityRecoveryMessage { - /// Request a chunk for a given candidate hash and validator index. - RequestChunk(RequestId, CandidateHash, ValidatorIndex), - /// Respond with chunk for a given candidate hash and validator index. - /// The response may be `None` if the requestee does not have the chunk. - Chunk(RequestId, Option), - /// Request full data for a given candidate hash. - RequestFullData(RequestId, CandidateHash), - /// Respond with full data for a given candidate hash. - /// The response may be `None` if the requestee does not have the data. - FullData(RequestId, Option), - } /// Network messages used by the bitfield distribution subsystem. #[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)] @@ -388,11 +367,8 @@ pub mod v1 { /// Statement distribution messages #[codec(index = 3)] StatementDistribution(StatementDistributionMessage), - /// Availability recovery messages - #[codec(index = 4)] - AvailabilityRecovery(AvailabilityRecoveryMessage), /// Approval distribution messages - #[codec(index = 5)] + #[codec(index = 4)] ApprovalDistribution(ApprovalDistributionMessage), } @@ -400,7 +376,6 @@ pub mod v1 { impl_try_from!(ValidationProtocol, PoVDistribution, PoVDistributionMessage); impl_try_from!(ValidationProtocol, StatementDistribution, StatementDistributionMessage); impl_try_from!(ValidationProtocol, ApprovalDistribution, ApprovalDistributionMessage); - impl_try_from!(ValidationProtocol, AvailabilityRecovery, AvailabilityRecoveryMessage); /// All network messages on the collation peer-set. #[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)] diff --git a/polkadot/node/network/protocol/src/request_response/mod.rs b/polkadot/node/network/protocol/src/request_response/mod.rs index 4fc6412d04..71914fb88e 100644 --- a/polkadot/node/network/protocol/src/request_response/mod.rs +++ b/polkadot/node/network/protocol/src/request_response/mod.rs @@ -56,10 +56,12 @@ pub mod v1; /// within protocols. #[derive(Copy, Clone, Debug, Hash, PartialEq, Eq, EnumIter)] pub enum Protocol { - /// Protocol for availability fetching, used by availability distribution. - AvailabilityFetching, + /// Protocol for chunk fetching, used by availability distribution and availability recovery. + ChunkFetching, /// Protocol for fetching collations from collators. CollationFetching, + /// Protocol for fetching available data. + AvailableDataFetching, } /// Default request timeout in seconds. @@ -67,7 +69,7 @@ pub enum Protocol { /// When decreasing this value, take into account that the very first request might need to open a /// connection, which can be slow. If this causes problems, we should ensure connectivity via peer /// sets. -const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(3); +const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(3); /// Request timeout where we can assume the connection is already open (e.g. we have peers in a /// peer set as well). @@ -90,7 +92,7 @@ impl Protocol { let p_name = self.into_protocol_name(); let (tx, rx) = mpsc::channel(self.get_channel_size()); let cfg = match self { - Protocol::AvailabilityFetching => RequestResponseConfig { + Protocol::ChunkFetching => RequestResponseConfig { name: p_name, max_request_size: 10_000, max_response_size: 10_000_000, @@ -105,6 +107,14 @@ impl Protocol { request_timeout: DEFAULT_REQUEST_TIMEOUT_CONNECTED, inbound_queue: Some(tx), }, + Protocol::AvailableDataFetching => RequestResponseConfig { + name: p_name, + max_request_size: 1_000, + // Available data size is dominated by the PoV size. + max_response_size: 30_000_000, + request_timeout: DEFAULT_REQUEST_TIMEOUT, + inbound_queue: Some(tx), + }, }; (rx, cfg) } @@ -117,9 +127,12 @@ impl Protocol { // times (due to network delays), 100 seems big enough to accomodate for "bursts", // assuming we can service requests relatively quickly, which would need to be measured // as well. - Protocol::AvailabilityFetching => 100, + Protocol::ChunkFetching => 100, // 10 seems reasonable, considering group sizes of max 10 validators. Protocol::CollationFetching => 10, + // Validators are constantly self-selecting to request available data which may lead + // to constant load and occasional burstiness. + Protocol::AvailableDataFetching => 100, } } @@ -131,8 +144,9 @@ impl Protocol { /// Get the protocol name associated with each peer set as static str. pub const fn get_protocol_name_static(self) -> &'static str { match self { - Protocol::AvailabilityFetching => "/polkadot/req_availability/1", + Protocol::ChunkFetching => "/polkadot/req_chunk/1", Protocol::CollationFetching => "/polkadot/req_collation/1", + Protocol::AvailableDataFetching => "/polkadot/req_available_data/1", } } } diff --git a/polkadot/node/network/protocol/src/request_response/request.rs b/polkadot/node/network/protocol/src/request_response/request.rs index da0d4cd53b..3db34f673c 100644 --- a/polkadot/node/network/protocol/src/request_response/request.rs +++ b/polkadot/node/network/protocol/src/request_response/request.rs @@ -39,17 +39,20 @@ pub trait IsRequest { #[derive(Debug)] pub enum Requests { /// Request an availability chunk from a node. - AvailabilityFetching(OutgoingRequest), + ChunkFetching(OutgoingRequest), /// Fetch a collation from a collator which previously announced it. CollationFetching(OutgoingRequest), + /// Request full available data from a node. + AvailableDataFetching(OutgoingRequest), } impl Requests { /// Get the protocol this request conforms to. pub fn get_protocol(&self) -> Protocol { match self { - Self::AvailabilityFetching(_) => Protocol::AvailabilityFetching, + Self::ChunkFetching(_) => Protocol::ChunkFetching, Self::CollationFetching(_) => Protocol::CollationFetching, + Self::AvailableDataFetching(_) => Protocol::AvailableDataFetching, } } @@ -62,8 +65,9 @@ impl Requests { /// contained in the enum. pub fn encode_request(self) -> (Protocol, OutgoingRequest>) { match self { - Self::AvailabilityFetching(r) => r.encode_request(), + Self::ChunkFetching(r) => r.encode_request(), Self::CollationFetching(r) => r.encode_request(), + Self::AvailableDataFetching(r) => r.encode_request(), } } } @@ -92,6 +96,7 @@ pub struct OutgoingRequest { } /// Any error that can occur when sending a request. +#[derive(Debug)] pub enum RequestError { /// Response could not be decoded. InvalidResponse(DecodingError), diff --git a/polkadot/node/network/protocol/src/request_response/v1.rs b/polkadot/node/network/protocol/src/request_response/v1.rs index 269b55424b..f88fc7f566 100644 --- a/polkadot/node/network/protocol/src/request_response/v1.rs +++ b/polkadot/node/network/protocol/src/request_response/v1.rs @@ -18,7 +18,10 @@ use parity_scale_codec::{Decode, Encode}; -use polkadot_primitives::v1::{CandidateHash, CandidateReceipt, ErasureChunk, ValidatorIndex, CompressedPoV, Hash}; +use polkadot_primitives::v1::{ + AvailableData, CandidateHash, CandidateReceipt, ErasureChunk, ValidatorIndex, + CompressedPoV, Hash, +}; use polkadot_primitives::v1::Id as ParaId; use super::request::IsRequest; @@ -26,16 +29,16 @@ use super::Protocol; /// Request an availability chunk. #[derive(Debug, Copy, Clone, Encode, Decode)] -pub struct AvailabilityFetchingRequest { +pub struct ChunkFetchingRequest { /// Hash of candidate we want a chunk for. pub candidate_hash: CandidateHash, /// The index of the chunk to fetch. pub index: ValidatorIndex, } -/// Receive a rqeuested erasure chunk. +/// Receive a requested erasure chunk. #[derive(Debug, Clone, Encode, Decode)] -pub enum AvailabilityFetchingResponse { +pub enum ChunkFetchingResponse { /// The requested chunk data. #[codec(index = 0)] Chunk(ChunkResponse), @@ -44,10 +47,19 @@ pub enum AvailabilityFetchingResponse { NoSuchChunk, } +impl From> for ChunkFetchingResponse { + fn from(x: Option) -> Self { + match x { + Some(c) => ChunkFetchingResponse::Chunk(c), + None => ChunkFetchingResponse::NoSuchChunk, + } + } +} + /// Skimmed down variant of `ErasureChunk`. /// /// Instead of transmitting a full `ErasureChunk` we transmit `ChunkResponse` in -/// `AvailabilityFetchingResponse`, which omits the chunk's index. The index is already known by +/// `ChunkFetchingResponse`, which omits the chunk's index. The index is already known by /// the requester and by not transmitting it, we ensure the requester is going to use his index /// value for validating the response, thus making sure he got what he requested. #[derive(Debug, Clone, Encode, Decode)] @@ -66,7 +78,7 @@ impl From for ChunkResponse { impl ChunkResponse { /// Re-build an `ErasureChunk` from response and request. - pub fn recombine_into_chunk(self, req: &AvailabilityFetchingRequest) -> ErasureChunk { + pub fn recombine_into_chunk(self, req: &ChunkFetchingRequest) -> ErasureChunk { ErasureChunk { chunk: self.chunk, proof: self.proof, @@ -75,9 +87,9 @@ impl ChunkResponse { } } -impl IsRequest for AvailabilityFetchingRequest { - type Response = AvailabilityFetchingResponse; - const PROTOCOL: Protocol = Protocol::AvailabilityFetching; +impl IsRequest for ChunkFetchingRequest { + type Response = ChunkFetchingResponse; + const PROTOCOL: Protocol = Protocol::ChunkFetching; } /// Request the advertised collation at that relay-parent. @@ -101,3 +113,35 @@ impl IsRequest for CollationFetchingRequest { type Response = CollationFetchingResponse; const PROTOCOL: Protocol = Protocol::CollationFetching; } + +/// Request the entire available data for a candidate. +#[derive(Debug, Clone, Encode, Decode)] +pub struct AvailableDataFetchingRequest { + /// The candidate hash to get the available data for. + pub candidate_hash: CandidateHash, +} + +/// Receive a requested available data. +#[derive(Debug, Clone, Encode, Decode)] +pub enum AvailableDataFetchingResponse { + /// The requested data. + #[codec(index = 0)] + AvailableData(AvailableData), + /// Node was not in possession of the requested data. + #[codec(index = 1)] + NoSuchData, +} + +impl From> for AvailableDataFetchingResponse { + fn from(x: Option) -> Self { + match x { + Some(data) => AvailableDataFetchingResponse::AvailableData(data), + None => AvailableDataFetchingResponse::NoSuchData, + } + } +} + +impl IsRequest for AvailableDataFetchingRequest { + type Response = AvailableDataFetchingResponse; + const PROTOCOL: Protocol = Protocol::AvailableDataFetching; +} diff --git a/polkadot/node/subsystem/src/messages.rs b/polkadot/node/subsystem/src/messages.rs index b38a26a4f1..ec463dc01e 100644 --- a/polkadot/node/subsystem/src/messages.rs +++ b/polkadot/node/subsystem/src/messages.rs @@ -267,7 +267,7 @@ impl NetworkBridgeMessage { #[derive(Debug, derive_more::From)] pub enum AvailabilityDistributionMessage { /// Incoming network request for an availability chunk. - AvailabilityFetchingRequest(IncomingRequest) + ChunkFetchingRequest(IncomingRequest) } /// Availability Recovery Message. @@ -280,16 +280,16 @@ pub enum AvailabilityRecoveryMessage { Option, // Optional backing group to request from first. oneshot::Sender>, ), - /// Event from the network bridge. + /// Incoming network request for available data. #[from] - NetworkBridgeUpdateV1(NetworkBridgeEvent), + AvailableDataFetchingRequest(IncomingRequest), } impl AvailabilityDistributionMessage { /// If the current variant contains the relay parent hash, return it. pub fn relay_parent(&self) -> Option { match self { - Self::AvailabilityFetchingRequest(_) => None, + Self::ChunkFetchingRequest(_) => None, } } } @@ -707,6 +707,7 @@ pub enum AllMessages { #[skip] AvailabilityDistribution(AvailabilityDistributionMessage), /// Message for the availability recovery subsystem. + #[skip] AvailabilityRecovery(AvailabilityRecoveryMessage), /// Message for the bitfield distribution subsystem. BitfieldDistribution(BitfieldDistributionMessage), @@ -740,8 +741,8 @@ pub enum AllMessages { GossipSupport(GossipSupportMessage), } -impl From> for AllMessages { - fn from(req: IncomingRequest) -> Self { +impl From> for AllMessages { + fn from(req: IncomingRequest) -> Self { From::::from(From::from(req)) } } @@ -755,3 +756,8 @@ impl From> for CollatorPro Self::CollationFetchingRequest(req) } } +impl From> for AllMessages { + fn from(req: IncomingRequest) -> Self { + From::::from(From::from(req)) + } +} diff --git a/polkadot/roadmap/implementers-guide/src/node/availability/availability-distribution.md b/polkadot/roadmap/implementers-guide/src/node/availability/availability-distribution.md index 757f258f37..db7fdebe82 100644 --- a/polkadot/roadmap/implementers-guide/src/node/availability/availability-distribution.md +++ b/polkadot/roadmap/implementers-guide/src/node/availability/availability-distribution.md @@ -11,12 +11,12 @@ their local availability store. ## Protocol This subsystem has no associated peer set right now, but instead relies on -a request/response protocol, defined by `Protocol::AvailabilityFetching`. +a request/response protocol, defined by `Protocol::ChunkFetching`. Input: - OverseerSignal::ActiveLeaves(`[ActiveLeavesUpdate]`) -- AvailabilityDistributionMessage{msg: AvailabilityFetchingRequest} +- AvailabilityDistributionMessage{msg: ChunkFetchingRequest} Output: @@ -33,7 +33,7 @@ Output: This subsystems monitors currently occupied cores for all active leaves. For each occupied core it will spawn a task fetching the erasure chunk which has the -`ValidatorIndex` of the node. For this an `AvailabilityFetchingRequest` is +`ValidatorIndex` of the node. For this an `ChunkFetchingRequest` is issued, via substrate's generic request/response protocol. The spawned task will start trying to fetch the chunk from validators in @@ -60,5 +60,5 @@ as we would like as many validators as possible to have their chunk. See this ### Serving On the other side the subsystem will listen for incoming -`AvailabilityFetchingRequest`s from the network bridge and will respond to +`ChunkFetchingRequest`s from the network bridge and will respond to queries, by looking the requested chunk up in the availability store. diff --git a/polkadot/roadmap/implementers-guide/src/node/availability/availability-recovery.md b/polkadot/roadmap/implementers-guide/src/node/availability/availability-recovery.md index 31ec018975..86026b67cc 100644 --- a/polkadot/roadmap/implementers-guide/src/node/availability/availability-recovery.md +++ b/polkadot/roadmap/implementers-guide/src/node/availability/availability-recovery.md @@ -24,29 +24,11 @@ Output: We hold a state which tracks the current recovery interactions we have live, as well as which request IDs correspond to which interactions. An interaction is a structure encapsulating all interaction with the network necessary to recover the available data. ```rust -type DataResponse = Result<(PeerId, ValidatorIndex, T), Unavailable>; - -enum Awaited { - Chunk(AwaitedData), - FullData(AwaitedData), -} - -struct AwaitedData { - issued_at: Instant, - validator_index: ValidatorIndex, - candidate_hash: CandidateHash, - response: ResponseChannel>, -} - struct State { /// Each interaction is implemented as its own async task, and these handles are for communicating with them. interactions: Map, /// A recent block hash for which state should be available. live_block_hash: Hash, - discovering_validators: Map>, - live_requests: Map, - next_request_id: RequestId, - connecting_validators: Stream<(AuthorityDiscoveryId, PeerId)>, /// interaction communication. This is cloned and given to interactions that are spun up. from_interaction_tx: Sender, @@ -65,25 +47,8 @@ struct Unavailable; enum FromInteraction { // An interaction concluded. Concluded(CandidateHash, Result), - // Make a request of a particular chunk from a particular validator. - MakeChunkRequest( - AuthorityDiscoveryId, - CandidateHash, - ValidatorIndex, - ResponseChannel>, - ), - // Make a request of the full data from a particular validator. - MakeDataRequest( - AuthorityDiscoveryId, - CandidateHash, - ValidatorIndex, - ResponseChannel>, - ) - // Report a peer. - ReportPeer( - PeerId, - Rep, - ), + // Send a request on the network. + NetworkRequest(Requests), } struct InteractionParams { @@ -100,7 +65,6 @@ enum InteractionPhase { // a random shuffling of the validators from the backing group which indicates the order // in which we connect to them and request the chunk. shuffled_backers: Vec, - requesting_pov: Option>> } RequestChunks { // a random shuffling of the validators which indicates the order in which we connect to the validators and @@ -140,51 +104,9 @@ On `Conclude`, shut down the subsystem. 1. Load the entry from the `interactions` map. It should always exist, if not for logic errors. Send the result to each member of `awaiting`. 1. Add the entry to the availability_lru. -#### `FromInteraction::MakeChunkRequest(discovery_pub, candidate_hash, validator_index, response)` +#### `FromInteraction::NetworkRequest(requests)` -1. Add an `Awaited::Chunk` to the `discovering_validators` map under `discovery_pub`. -1. Issue a `NetworkBridgeMessage::ConnectToValidators`. -1. Add the stream of connected validator events to `state.connecting_validators`. - -#### `FromInteraction::MakeDataRequest(discovery_pub, candidate_hash, validator_index, response)` - -1. Add an `Awaited::FullData` to the `discovering_validators` map under `discovery_pub`. -1. Issue a `NetworkBridgeMessage::ConnectToValidators`. -1. Add the stream of connected validator events to `state.connecting_validators`. - -#### `FromInteraction::ReportPeer(peer, rep)` - -1. Issue a `NetworkBridgeMessage::ReportPeer(peer, rep)`. - -### Responding to network events. - -#### On `connecting_validators` event: - -1. If the validator exists under `discovering_validators`, remove the entry. -1. For each `Awaited` in the entry, - 1. If `Awaited::Chunk` issue a `AvailabilityRecoveryV1Message::RequestChunk(next_request_id, candidate_hash, validator_index)` and make an entry in the `live_requests` map. - 1. If `Awaited::FullData` issue a `AvailabilityRecoveryV1Message::RequestFullData(next_request_id, candidate_hash, validator_index)` and make an entry in the `live_requests` map. - 1. Increment `next_request_id`. - -#### On receiving `AvailabilityRecoveryV1::RequestChunk(r_id, candidate_hash, validator_index)` - -1. Issue a `AvailabilityStore::QueryChunk(candidate_hash, validator_index, response)` message. -1. Whatever the result, issue a `AvailabilityRecoveryV1Message::Chunk(r_id, response)` message. - -#### On receiving `AvailabilityRecoveryV1::Chunk(r_id, chunk)` - -1. If there exists an entry under `r_id`, remove it. If there doesn't exist one, report the peer and return. If the entry is not `Awaited::Chunk` or the peer in the entry doesn't match the sending peer, reinstate the entry, report the peer, and return. -1. Send the chunk response on the `awaited_chunk` for the interaction to handle. - -#### On receiving `AvailabilityRecoveryV1::RequestFullData(r_id, candidate_hash)` - -1. Issue a `AvailabilityStore::QueryAvailableData(candidate_hash, response)` message. -1. Whatever the result, issue a `AvailabilityRecoveryV1Message::FullData(r_id, response)` message. - -#### On receiving `AvailabilityRecoveryV1::FullData(r_id, data)` - -1. If there exists an entry under `r_id`, remove it. If there doesn't exist one, report the peer and return. If the entry is not `Awaited::FullData` or the peer in the entry doesn't match the sending peer, reinstate the entry, report the peer, and return. -1. Send the data response on the `response` channel for the interaction to handle. +1. Forward with `NetworkBridgeMessage::SendRequests`. ### Interaction logic @@ -209,18 +131,17 @@ const N_PARALLEL: usize = 50; Loop: * If the phase is `InteractionPhase::RequestFromBackers` * If the `requesting_pov` is `Some`, poll for updates on it. If it concludes, set `requesting_pov` to `None`. + * If the `requesting_pov` is `None`, take the next backer off the `shuffled_backers`. + * If the backer is `Some`, issue a `FromInteraction::NetworkRequest` with a network request for the `AvailableData` and wait for the response. * If it concludes with a `None` result, return to beginning. * If it concludes with available data, attempt a re-encoding. * If it has the correct erasure-root, break and issue a `Concluded(Ok(available_data))`. * If it has an incorrect erasure-root, issue a `FromInteraction::ReportPeer` message and return to beginning. - * If the `requesting_pov` is `None`, take the next backer off the `shuffled_backers`. - * If the backer is `Some`, initialize `(tx, rx)`, issue a `FromInteraction::MakeFullDataRequest(validator, candidate_hash, validator_index, tx)`, set `requesting_pov` to `Some` and return. * If the backer is `None`, set the phase to `InteractionPhase::RequestChunks` with a random shuffling of validators and empty `received_chunks` and `requesting_chunks`. + * If the phase is `InteractionPhase::RequestChunks`: * Poll for new updates from `requesting_chunks`. Check merkle proofs of any received chunks, and any failures should lead to issuance of a `FromInteraction::ReportPeer` message. * If `received_chunks` has more than `threshold` entries, attempt to recover the data. If that fails, or a re-encoding produces an incorrect erasure-root, break and issue a `Concluded(RecoveryError::Invalid)`. If correct, break and issue `Concluded(Ok(available_data))`. * While there are fewer than `N_PARALLEL` entries in `requesting_chunks`, * Pop the next item from `shuffling`. If it's empty and `requesting_chunks` is empty, break and set the phase to `Concluded(None)`. - * Initialize `(tx, rx)`. - * Issue a `FromInteraction::MakeChunkRequest(validator, candidate_hash, validator_index, tx)`. - * Add `rx` to `requesting_chunks`. + * Issue a `FromInteraction::NetworkRequest` and wait for the response in `requesting_chunks`.