diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 300fd2e714..6b53e9089c 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -96,6 +96,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "always-assert" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbf688625d06217d5b1bb0ea9d9c44a1635fd0ee3534466388d18203174f4d11" + [[package]] name = "ansi_term" version = "0.11.0" @@ -5322,10 +5328,10 @@ dependencies = [ name = "polkadot-collator-protocol" version = "0.1.0" dependencies = [ + "always-assert", "assert_matches", "env_logger 0.8.2", "futures 0.3.12", - "futures-timer 3.0.2", "log", "polkadot-node-network-protocol", "polkadot-node-primitives", @@ -5649,8 +5655,6 @@ dependencies = [ "polkadot-primitives", "sc-network", "strum", - "thiserror", - "zstd", ] [[package]] @@ -5845,6 +5849,8 @@ dependencies = [ "sp-std", "sp-trie", "sp-version", + "thiserror", + "zstd", ] [[package]] diff --git a/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs b/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs index b2a282f260..17a2769b2a 100644 --- a/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs +++ b/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs @@ -23,7 +23,7 @@ use futures::{FutureExt, SinkExt}; use polkadot_erasure_coding::branch_hash; use polkadot_node_network_protocol::request_response::{ - request::{OutgoingRequest, RequestError, Requests}, + request::{OutgoingRequest, RequestError, Requests, Recipient}, v1::{AvailabilityFetchingRequest, AvailabilityFetchingResponse}, }; use polkadot_primitives::v1::{ @@ -31,7 +31,7 @@ use polkadot_primitives::v1::{ SessionIndex, }; use polkadot_subsystem::messages::{ - AllMessages, AvailabilityStoreMessage, NetworkBridgeMessage, + AllMessages, AvailabilityStoreMessage, NetworkBridgeMessage, IfDisconnected, }; use polkadot_subsystem::{SubsystemContext, jaeger}; @@ -330,12 +330,12 @@ impl RunningTask { validator: &AuthorityDiscoveryId, ) -> std::result::Result { let (full_request, response_recv) = - OutgoingRequest::new(validator.clone(), self.request); + OutgoingRequest::new(Recipient::Authority(validator.clone()), self.request); let requests = Requests::AvailabilityFetching(full_request); self.sender .send(FromFetchTask::Message(AllMessages::NetworkBridge( - NetworkBridgeMessage::SendRequests(vec![requests]), + NetworkBridgeMessage::SendRequests(vec![requests], IfDisconnected::TryConnect) ))) .await .map_err(|_| TaskError::ShuttingDown)?; diff --git a/polkadot/node/network/availability-distribution/src/requester/fetch_task/tests.rs b/polkadot/node/network/availability-distribution/src/requester/fetch_task/tests.rs index b8c70a324d..4fe314cee3 100644 --- a/polkadot/node/network/availability-distribution/src/requester/fetch_task/tests.rs +++ b/polkadot/node/network/availability-distribution/src/requester/fetch_task/tests.rs @@ -28,6 +28,7 @@ use sp_keyring::Sr25519Keyring; use polkadot_primitives::v1::{BlockData, CandidateHash, PoV, ValidatorIndex}; use polkadot_node_network_protocol::request_response::v1; +use polkadot_node_network_protocol::request_response::Recipient; use polkadot_subsystem::messages::AllMessages; use crate::metrics::Metrics; @@ -56,7 +57,7 @@ fn task_does_not_accept_invalid_chunk() { chunk_responses: { let mut m = HashMap::new(); m.insert( - Sr25519Keyring::Alice.public().into(), + Recipient::Authority(Sr25519Keyring::Alice.public().into()), AvailabilityFetchingResponse::Chunk( v1::ChunkResponse { chunk: vec![1,2,3], @@ -88,7 +89,7 @@ fn task_stores_valid_chunk() { chunk_responses: { let mut m = HashMap::new(); m.insert( - Sr25519Keyring::Alice.public().into(), + Recipient::Authority(Sr25519Keyring::Alice.public().into()), AvailabilityFetchingResponse::Chunk( v1::ChunkResponse { chunk: chunk.chunk.clone(), @@ -124,7 +125,7 @@ fn task_does_not_accept_wrongly_indexed_chunk() { chunk_responses: { let mut m = HashMap::new(); m.insert( - Sr25519Keyring::Alice.public().into(), + Recipient::Authority(Sr25519Keyring::Alice.public().into()), AvailabilityFetchingResponse::Chunk( v1::ChunkResponse { chunk: chunk.chunk.clone(), @@ -163,7 +164,7 @@ fn task_stores_valid_chunk_if_there_is_one() { chunk_responses: { let mut m = HashMap::new(); m.insert( - Sr25519Keyring::Alice.public().into(), + Recipient::Authority(Sr25519Keyring::Alice.public().into()), AvailabilityFetchingResponse::Chunk( v1::ChunkResponse { chunk: chunk.chunk.clone(), @@ -172,11 +173,11 @@ fn task_stores_valid_chunk_if_there_is_one() { ) ); m.insert( - Sr25519Keyring::Bob.public().into(), + Recipient::Authority(Sr25519Keyring::Bob.public().into()), AvailabilityFetchingResponse::NoSuchChunk ); m.insert( - Sr25519Keyring::Charlie.public().into(), + Recipient::Authority(Sr25519Keyring::Charlie.public().into()), AvailabilityFetchingResponse::Chunk( v1::ChunkResponse { chunk: vec![1,2,3], @@ -199,7 +200,7 @@ fn task_stores_valid_chunk_if_there_is_one() { struct TestRun { /// Response to deliver for a given validator index. /// None means, answer with NetworkError. - chunk_responses: HashMap, + chunk_responses: HashMap, /// Set of chunks that should be considered valid: valid_chunks: HashSet>, } @@ -240,11 +241,12 @@ impl TestRun { /// end. async fn handle_message(&self, msg: AllMessages) -> bool { match msg { - AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(reqs)) => { + AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(reqs, IfDisconnected::TryConnect)) => { let mut valid_responses = 0; for req in reqs { let req = match req { Requests::AvailabilityFetching(req) => req, + _ => panic!("Unexpected request"), }; let response = self.chunk_responses.get(&req.peer) .ok_or(network::RequestFailure::Refused); diff --git a/polkadot/node/network/availability-distribution/src/tests/state.rs b/polkadot/node/network/availability-distribution/src/tests/state.rs index 1d96dd1adf..d342453a0a 100644 --- a/polkadot/node/network/availability-distribution/src/tests/state.rs +++ b/polkadot/node/network/availability-distribution/src/tests/state.rs @@ -29,6 +29,7 @@ use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr}; use sp_keyring::Sr25519Keyring; use sp_core::{traits::SpawnNamed, testing::TaskExecutor}; use sc_network as network; +use sc_network::IfDisconnected; use sc_network::config as netconfig; use polkadot_subsystem::{ActiveLeavesUpdate, FromOverseer, OverseerSignal, messages::{AllMessages, @@ -201,7 +202,7 @@ impl TestState { tracing::trace!(target: LOG_TARGET, remaining_stores, "Stores left to go"); let msg = overseer_recv(&mut rx).await; match msg { - AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(reqs)) => { + AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(reqs, IfDisconnected::TryConnect)) => { for req in reqs { // Forward requests: let in_req = to_incoming_req(&executor, req); @@ -313,5 +314,6 @@ fn to_incoming_req( tx ) } + _ => panic!("Unexpected request!"), } } diff --git a/polkadot/node/network/bridge/src/action.rs b/polkadot/node/network/bridge/src/action.rs index 3ef33366df..a1fa2750f8 100644 --- a/polkadot/node/network/bridge/src/action.rs +++ b/polkadot/node/network/bridge/src/action.rs @@ -24,7 +24,7 @@ use polkadot_node_network_protocol::{ use polkadot_primitives::v1::{AuthorityDiscoveryId, BlockNumber}; use polkadot_subsystem::messages::{AllMessages, NetworkBridgeMessage}; use polkadot_subsystem::{ActiveLeavesUpdate, FromOverseer, OverseerSignal}; -use sc_network::Event as NetworkEvent; +use sc_network::{Event as NetworkEvent, IfDisconnected}; use polkadot_node_network_protocol::{request_response::Requests, ObservedRole}; @@ -45,7 +45,7 @@ pub(crate) enum Action { SendCollationMessages(Vec<(Vec, protocol_v1::CollationProtocol)>), /// Ask network to send requests. - SendRequests(Vec), + SendRequests(Vec, IfDisconnected), /// Ask network to connect to validators. ConnectToValidators { @@ -125,7 +125,7 @@ impl From NetworkBridgeMessage::SendCollationMessage(peers, msg) => { Action::SendCollationMessages(vec![(peers, msg)]) } - NetworkBridgeMessage::SendRequests(reqs) => Action::SendRequests(reqs), + NetworkBridgeMessage::SendRequests(reqs, if_disconnected) => Action::SendRequests(reqs, if_disconnected), NetworkBridgeMessage::SendValidationMessages(msgs) => { Action::SendValidationMessages(msgs) } diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs index d8913dbb25..fd3dd82ca1 100644 --- a/polkadot/node/network/bridge/src/lib.rs +++ b/polkadot/node/network/bridge/src/lib.rs @@ -235,11 +235,11 @@ where } } - Action::SendRequests(reqs) => { + Action::SendRequests(reqs, if_disconnected) => { for req in reqs { bridge .network_service - .start_request(&mut bridge.authority_discovery_service, req) + .start_request(&mut bridge.authority_discovery_service, req, if_disconnected) .await; } }, @@ -604,7 +604,7 @@ mod tests { use parking_lot::Mutex; use assert_matches::assert_matches; - use sc_network::Event as NetworkEvent; + use sc_network::{Event as NetworkEvent, IfDisconnected}; use polkadot_subsystem::{ActiveLeavesUpdate, FromOverseer, OverseerSignal}; use polkadot_subsystem::messages::{ @@ -681,7 +681,7 @@ mod tests { Box::pin((&mut self.action_tx).sink_map_err(Into::into)) } - async fn start_request(&self, _: &mut AD, _: Requests) { + async fn start_request(&self, _: &mut AD, _: Requests, _: IfDisconnected) { } } diff --git a/polkadot/node/network/bridge/src/multiplexer.rs b/polkadot/node/network/bridge/src/multiplexer.rs index d9475d0ea4..3901a91078 100644 --- a/polkadot/node/network/bridge/src/multiplexer.rs +++ b/polkadot/node/network/bridge/src/multiplexer.rs @@ -136,6 +136,11 @@ fn multiplex_single( decode_with_peer::(peer, payload)?, pending_response, )), + Protocol::CollationFetching => From::from(IncomingRequest::new( + peer, + decode_with_peer::(peer, payload)?, + pending_response, + )), }; Ok(r) } diff --git a/polkadot/node/network/bridge/src/network.rs b/polkadot/node/network/bridge/src/network.rs index ed25f9f368..28a32a5d33 100644 --- a/polkadot/node/network/bridge/src/network.rs +++ b/polkadot/node/network/bridge/src/network.rs @@ -29,7 +29,7 @@ use sc_network::{IfDisconnected, NetworkService, OutboundFailure, RequestFailure use polkadot_node_network_protocol::{ peer_set::PeerSet, - request_response::{OutgoingRequest, Requests}, + request_response::{OutgoingRequest, Requests, Recipient}, PeerId, UnifiedReputationChange as Rep, }; use polkadot_primitives::v1::{Block, Hash}; @@ -113,6 +113,7 @@ pub trait Network: Send + 'static { &self, authority_discovery: &mut AD, req: Requests, + if_disconnected: IfDisconnected, ); /// Report a given peer as either beneficial (+) or costly (-) according to the given scalar. @@ -202,6 +203,7 @@ impl Network for Arc> { &self, authority_discovery: &mut AD, req: Requests, + if_disconnected: IfDisconnected, ) { let ( protocol, @@ -212,14 +214,18 @@ impl Network for Arc> { }, ) = req.encode_request(); - let peer_id = authority_discovery - .get_addresses_by_authority_id(peer) - .await - .and_then(|addrs| { - addrs - .into_iter() - .find_map(|addr| peer_id_from_multiaddr(&addr)) - }); + let peer_id = match peer { + Recipient::Peer(peer_id) => Some(peer_id), + Recipient::Authority(authority) => + authority_discovery + .get_addresses_by_authority_id(authority) + .await + .and_then(|addrs| { + addrs + .into_iter() + .find_map(|addr| peer_id_from_multiaddr(&addr)) + }), + }; let peer_id = match peer_id { None => { @@ -244,7 +250,7 @@ impl Network for Arc> { protocol.into_protocol_name(), payload, pending_response, - IfDisconnected::TryConnect, + if_disconnected, ); } } diff --git a/polkadot/node/network/collator-protocol/Cargo.toml b/polkadot/node/network/collator-protocol/Cargo.toml index 458a1df5a2..9f95fb94ac 100644 --- a/polkadot/node/network/collator-protocol/Cargo.toml +++ b/polkadot/node/network/collator-protocol/Cargo.toml @@ -14,12 +14,12 @@ polkadot-node-network-protocol = { path = "../../network/protocol" } polkadot-node-primitives = { path = "../../primitives" } polkadot-node-subsystem-util = { path = "../../subsystem-util" } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } +always-assert = "0.1.2" [dev-dependencies] log = "0.4.13" env_logger = "0.8.2" assert_matches = "1.4.0" -futures-timer = "3.0.2" sp-core = { git = "https://github.com/paritytech/substrate", branch = "master", features = ["std"] } sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/polkadot/node/network/collator-protocol/src/collator_side.rs b/polkadot/node/network/collator-protocol/src/collator_side.rs index 91d6768dc6..80c7576f43 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side.rs @@ -21,7 +21,9 @@ use super::{LOG_TARGET, Result}; use futures::{select, FutureExt, channel::oneshot}; use polkadot_primitives::v1::{ - CollatorId, CoreIndex, CoreState, Hash, Id as ParaId, CandidateReceipt, PoV, ValidatorId, CandidateHash, + CandidateHash, CandidateReceipt, CollatorId, CompressedPoV, CoreIndex, + CoreState, Hash, Id as ParaId, + PoV, ValidatorId }; use polkadot_subsystem::{ jaeger, PerLeafSpan, @@ -29,7 +31,9 @@ use polkadot_subsystem::{ messages::{AllMessages, CollatorProtocolMessage, NetworkBridgeMessage, NetworkBridgeEvent}, }; use polkadot_node_network_protocol::{ - peer_set::PeerSet, v1 as protocol_v1, View, PeerId, RequestId, OurView, + OurView, PeerId, View, peer_set::PeerSet, + request_response::{IncomingRequest, v1::{CollationFetchingRequest, CollationFetchingResponse}}, + v1 as protocol_v1 }; use polkadot_node_subsystem_util::{ validator_discovery, @@ -562,25 +566,61 @@ async fn process_msg( ); } }, + CollationFetchingRequest(incoming) => { + let _span = state.span_per_relay_parent.get(&incoming.payload.relay_parent).map(|s| s.child("request-collation")); + match state.collating_on { + Some(our_para_id) => { + if our_para_id == incoming.payload.para_id { + let (receipt, pov) = if let Some(collation) = state.collations.get_mut(&incoming.payload.relay_parent) { + collation.status.advance_to_requested(); + (collation.receipt.clone(), collation.pov.clone()) + } else { + tracing::warn!( + target: LOG_TARGET, + relay_parent = %incoming.payload.relay_parent, + "received a `RequestCollation` for a relay parent we don't have collation stored.", + ); + + return Ok(()); + }; + + let _span = _span.as_ref().map(|s| s.child("sending")); + send_collation(state, incoming, receipt, pov).await; + } else { + tracing::warn!( + target: LOG_TARGET, + for_para_id = %incoming.payload.para_id, + our_para_id = %our_para_id, + "received a `CollationFetchingRequest` for unexpected para_id", + ); + } + } + None => { + tracing::warn!( + target: LOG_TARGET, + for_para_id = %incoming.payload.para_id, + "received a `RequestCollation` while not collating on any para", + ); + } + } + } } Ok(()) } /// Issue a response to a previously requested collation. -#[tracing::instrument(level = "trace", skip(ctx, state, pov), fields(subsystem = LOG_TARGET))] +#[tracing::instrument(level = "trace", skip(state, pov), fields(subsystem = LOG_TARGET))] async fn send_collation( - ctx: &mut impl SubsystemContext, state: &mut State, - request_id: RequestId, - origin: PeerId, + request: IncomingRequest, receipt: CandidateReceipt, pov: PoV, ) { - let pov = match protocol_v1::CompressedPoV::compress(&pov) { + let pov = match CompressedPoV::compress(&pov) { Ok(pov) => pov, Err(error) => { - tracing::debug!( + tracing::error!( target: LOG_TARGET, error = ?error, "Failed to create `CompressedPov`", @@ -589,22 +629,18 @@ async fn send_collation( } }; - let wire_message = protocol_v1::CollatorProtocolMessage::Collation(request_id, receipt, pov); - - ctx.send_message(AllMessages::NetworkBridge( - NetworkBridgeMessage::SendCollationMessage( - vec![origin], - protocol_v1::CollationProtocol::CollatorProtocol(wire_message), - ) - )).await; - + if let Err(_) = request.send_response(CollationFetchingResponse::Collation(receipt, pov)) { + tracing::warn!( + target: LOG_TARGET, + "Sending collation response failed", + ); + } state.metrics.on_collation_sent(); } /// A networking messages switch. -#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))] +#[tracing::instrument(level = "trace", skip(state), fields(subsystem = LOG_TARGET))] async fn handle_incoming_peer_message( - ctx: &mut impl SubsystemContext, state: &mut State, origin: PeerId, msg: protocol_v1::CollatorProtocolMessage, @@ -624,50 +660,6 @@ async fn handle_incoming_peer_message( "AdvertiseCollation message is not expected on the collator side of the protocol", ); } - RequestCollation(request_id, relay_parent, para_id) => { - let _span = state.span_per_relay_parent.get(&relay_parent).map(|s| s.child("request-collation")); - match state.collating_on { - Some(our_para_id) => { - if our_para_id == para_id { - let (receipt, pov) = if let Some(collation) = state.collations.get_mut(&relay_parent) { - collation.status.advance_to_requested(); - (collation.receipt.clone(), collation.pov.clone()) - } else { - tracing::warn!( - target: LOG_TARGET, - relay_parent = %relay_parent, - "received a `RequestCollation` for a relay parent we don't have collation stored.", - ); - - return Ok(()); - }; - - let _span = _span.as_ref().map(|s| s.child("sending")); - send_collation(ctx, state, request_id, origin, receipt, pov).await; - } else { - tracing::warn!( - target: LOG_TARGET, - for_para_id = %para_id, - our_para_id = %our_para_id, - "received a `RequestCollation` for unexpected para_id", - ); - } - } - None => { - tracing::warn!( - target: LOG_TARGET, - for_para_id = %para_id, - "received a `RequestCollation` while not collating on any para", - ); - } - } - } - Collation(_, _, _) => { - tracing::warn!( - target: LOG_TARGET, - "Collation message is not expected on the collator side of the protocol", - ); - } CollationSeconded(statement) => { if !matches!(statement.payload(), Statement::Seconded(_)) { tracing::warn!( @@ -759,7 +751,7 @@ async fn handle_network_msg( handle_our_view_change(state, view).await?; } PeerMessage(remote, msg) => { - handle_incoming_peer_message(ctx, state, remote, msg).await?; + handle_incoming_peer_message(state, remote, msg).await?; } } @@ -861,7 +853,7 @@ mod tests { use assert_matches::assert_matches; use futures::{executor, future, Future, channel::mpsc}; - use sp_core::crypto::Pair; + use sp_core::{crypto::Pair, Decode}; use sp_keyring::Sr25519Keyring; use polkadot_primitives::v1::{ @@ -872,7 +864,11 @@ mod tests { use polkadot_subsystem::{ActiveLeavesUpdate, messages::{RuntimeApiMessage, RuntimeApiRequest}, jaeger}; use polkadot_node_subsystem_util::TimeoutExt; use polkadot_subsystem_testhelpers as test_helpers; - use polkadot_node_network_protocol::{view, our_view}; + use polkadot_node_network_protocol::{ + our_view, + view, + request_response::request::IncomingRequest, + }; #[derive(Default)] struct TestCandidateBuilder { @@ -1380,41 +1376,33 @@ mod tests { // advertise it. expect_advertise_collation_msg(&mut virtual_overseer, &test_state, &peer, test_state.relay_parent).await; - let request_id = 42; - // Request a collation. + let (tx, rx) = oneshot::channel(); overseer_send( &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage( - peer.clone(), - protocol_v1::CollatorProtocolMessage::RequestCollation( - request_id, - test_state.relay_parent, - test_state.para_id, - ) + CollatorProtocolMessage::CollationFetchingRequest( + IncomingRequest::new( + peer, + CollationFetchingRequest { + relay_parent: test_state.relay_parent, + para_id: test_state.para_id, + }, + tx, ) ) ).await; - // Wait for the reply. assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::SendCollationMessage( - to, - protocol_v1::CollationProtocol::CollatorProtocol(wire_message), + rx.await, + Ok(full_response) => { + let CollationFetchingResponse::Collation(receipt, pov): CollationFetchingResponse + = CollationFetchingResponse::decode( + &mut full_response.result + .expect("We should have a proper answer").as_ref() ) - ) => { - assert_eq!(to, vec![peer]); - assert_matches!( - wire_message, - protocol_v1::CollatorProtocolMessage::Collation(req_id, receipt, pov) => { - assert_eq!(req_id, request_id); - assert_eq!(receipt, candidate); - assert_eq!(pov.decompress().unwrap(), pov_block); - } - ); + .expect("Decoding should work"); + assert_eq!(receipt, candidate); + assert_eq!(pov.decompress().unwrap(), pov_block); } ); @@ -1424,19 +1412,25 @@ mod tests { let peer = test_state.validator_peer_id[2].clone(); // Re-request a collation. + let (tx, rx) = oneshot::channel(); overseer_send( &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage( - peer.clone(), - protocol_v1::CollatorProtocolMessage::RequestCollation( - 43, - old_relay_parent, - test_state.para_id, - ) + CollatorProtocolMessage::CollationFetchingRequest( + IncomingRequest::new( + peer, + CollationFetchingRequest { + relay_parent: old_relay_parent, + para_id: test_state.para_id, + }, + tx, ) ) ).await; + // Re-requesting collation should fail: + assert_matches!( + rx.await, + Err(_) => {} + ); assert!(overseer_recv_with_timeout(&mut virtual_overseer, TIMEOUT).await.is_none()); diff --git a/polkadot/node/network/collator-protocol/src/lib.rs b/polkadot/node/network/collator-protocol/src/lib.rs index fdaf33b519..d4c4facdf9 100644 --- a/polkadot/node/network/collator-protocol/src/lib.rs +++ b/polkadot/node/network/collator-protocol/src/lib.rs @@ -20,7 +20,6 @@ #![deny(missing_docs, unused_crate_dependencies)] #![recursion_limit="256"] -use std::time::Duration; use futures::{channel::oneshot, FutureExt, TryFutureExt}; use thiserror::Error; @@ -44,7 +43,6 @@ mod collator_side; mod validator_side; const LOG_TARGET: &'static str = "parachain::collator-protocol"; -const REQUEST_TIMEOUT: Duration = Duration::from_secs(1); #[derive(Debug, Error)] enum Error { @@ -94,7 +92,6 @@ impl CollatorProtocolSubsystem { match self.protocol_side { ProtocolSide::Validator(metrics) => validator_side::run( ctx, - REQUEST_TIMEOUT, metrics, ).await, ProtocolSide::Collator(id, metrics) => collator_side::run( @@ -129,7 +126,7 @@ where #[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))] async fn modify_reputation(ctx: &mut Context, peer: PeerId, rep: Rep) where - Context: SubsystemContext, + Context: SubsystemContext, { tracing::trace!( target: LOG_TARGET, diff --git a/polkadot/node/network/collator-protocol/src/validator_side.rs b/polkadot/node/network/collator-protocol/src/validator_side.rs index 7079d9c2de..a98b1c36b7 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side.rs @@ -14,15 +14,10 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use std::{collections::{HashMap, HashSet}, time::Duration, task::Poll, sync::Arc}; +use std::{collections::{HashMap, HashSet}, sync::Arc, task::Poll}; -use futures::{ - StreamExt, - FutureExt, - channel::oneshot, - future::BoxFuture, - stream::FuturesUnordered, -}; +use futures::{FutureExt, channel::oneshot, future::{Fuse, FusedFuture, BoxFuture}}; +use always_assert::never; use polkadot_primitives::v1::{ Id as ParaId, CandidateReceipt, CollatorId, Hash, PoV, @@ -32,18 +27,25 @@ use polkadot_subsystem::{ FromOverseer, OverseerSignal, SubsystemContext, messages::{ AllMessages, CandidateSelectionMessage, CollatorProtocolMessage, NetworkBridgeMessage, - NetworkBridgeEvent, + NetworkBridgeEvent, IfDisconnected, }, }; use polkadot_node_network_protocol::{ - v1 as protocol_v1, View, OurView, PeerId, RequestId, UnifiedReputationChange as Rep, + OurView, PeerId, UnifiedReputationChange as Rep, View, + request_response::{OutgoingRequest, Requests, request::{Recipient, RequestError}}, v1 as protocol_v1 }; -use polkadot_node_subsystem_util::{TimeoutExt as _, metrics::{self, prometheus}}; +use polkadot_node_network_protocol::request_response::v1::{CollationFetchingRequest, CollationFetchingResponse}; +use polkadot_node_network_protocol::request_response as req_res; +use polkadot_node_subsystem_util::metrics::{self, prometheus}; use polkadot_node_primitives::{Statement, SignedFullStatement}; use super::{modify_reputation, LOG_TARGET, Result}; const COST_UNEXPECTED_MESSAGE: Rep = Rep::CostMinor("An unexpected message"); +/// Message could not be decoded properly. +const COST_CORRUPTED_MESSAGE: Rep = Rep::CostMinor("Message was corrupt"); +/// Network errors that originated at the remote host should have same cost as timeout. +const COST_NETWORK_ERROR: Rep = Rep::CostMinor("Some network error"); const COST_REQUEST_TIMED_OUT: Rep = Rep::CostMinor("A collation request has timed out"); const COST_REPORT_BAD: Rep = Rep::CostMajor("A collator was reported by another subsystem"); const BENEFIT_NOTIFY_GOOD: Rep = Rep::BenefitMinor("A collator was noted good by another subsystem"); @@ -51,6 +53,7 @@ const BENEFIT_NOTIFY_GOOD: Rep = Rep::BenefitMinor("A collator was noted good by #[derive(Clone, Default)] pub struct Metrics(Option); + impl Metrics { fn on_request(&self, succeeded: std::result::Result<(), ()>) { if let Some(metrics) = &self.0 { @@ -118,60 +121,13 @@ impl metrics::Metrics for Metrics { } } -#[derive(Debug)] -enum CollationRequestResult { - Received(RequestId), - Timeout(RequestId), -} - -/// A Future representing an ongoing collation request. -/// It may timeout or end in a graceful fashion if a requested -/// collation has been received sucessfully or chain has moved on. -struct CollationRequest { - // The response for this request has been received successfully or - // chain has moved forward and this request is no longer relevant. - received: oneshot::Receiver<()>, - - // The timeout of this request. - timeout: Duration, - - // The id of this request. - request_id: RequestId, - - // A jaeger span corresponding to the lifetime of the request. - span: Option, -} - -impl CollationRequest { - async fn wait(self) -> CollationRequestResult { - use CollationRequestResult::*; - - let CollationRequest { - received, - timeout, - request_id, - mut span, - } = self; - - match received.timeout(timeout).await { - None => { - span.as_mut().map(|s| s.add_string_tag("success", "false")); - Timeout(request_id) - } - Some(_) => { - span.as_mut().map(|s| s.add_string_tag("success", "true")); - Received(request_id) - } - } - } -} - struct PerRequest { - // The sender side to signal the `CollationRequest` to resolve successfully. - received: oneshot::Sender<()>, - - // Send result here. - result: oneshot::Sender<(CandidateReceipt, PoV)>, + /// Responses from collator. + from_collator: Fuse>>, + /// Sender to forward to initial requester. + to_requester: oneshot::Sender<(CandidateReceipt, PoV)>, + /// A jaeger span corresponding to the lifetime of the request. + span: Option, } /// All state relevant for the validator side of the protocol lives here. @@ -190,31 +146,12 @@ struct State { /// per collator per source per relay-parent. advertisements: HashMap>, - /// Derive RequestIds from this. - next_request_id: RequestId, - /// The collations we have requested by relay parent and para id. /// /// For each relay parent and para id we may be connected to a number /// of collators each of those may have advertised a different collation. /// So we group such cases here. - requested_collations: HashMap<(Hash, ParaId, PeerId), RequestId>, - - /// Housekeeping handles we need to have per request to: - /// - cancel ongoing requests - /// - reply with collations to other subsystems. - requests_info: HashMap, - - /// Collation requests that are currently in progress. - requests_in_progress: FuturesUnordered>, - - /// Delay after which a collation request would time out. - request_timeout: Duration, - - /// Leaves have recently moved out of scope. - /// These are looked into when we receive previously requested collations that we - /// are no longer interested in. - recently_removed_heads: HashSet, + requested_collations: HashMap<(Hash, ParaId, PeerId), PerRequest>, /// Metrics. metrics: Metrics, @@ -336,92 +273,13 @@ async fn handle_peer_view_change( advertisements.retain(|(_, relay_parent)| !removed.contains(relay_parent)); } - let mut requests_to_cancel = Vec::new(); - for removed in removed.into_iter() { - state.requested_collations.retain(|k, v| { - if k.0 == removed { - requests_to_cancel.push(*v); - false - } else { - true - } - }); - } - - for r in requests_to_cancel.into_iter() { - if let Some(per_request) = state.requests_info.remove(&r) { - per_request.received.send(()).map_err(|_| oneshot::Canceled)?; - } + state.requested_collations.retain(|k, _| k.0 != removed); } Ok(()) } -/// We have received a collation. -/// - Cancel all ongoing requests -/// - Reply to interested parties if any -/// - Store collation. -#[tracing::instrument(level = "trace", skip(ctx, state, pov), fields(subsystem = LOG_TARGET))] -async fn received_collation( - ctx: &mut Context, - state: &mut State, - origin: PeerId, - request_id: RequestId, - receipt: CandidateReceipt, - pov: protocol_v1::CompressedPoV, -) -where - Context: SubsystemContext -{ - let relay_parent = receipt.descriptor.relay_parent; - let para_id = receipt.descriptor.para_id; - - if let Some(id) = state.requested_collations.remove( - &(relay_parent, para_id, origin.clone()) - ) { - if id == request_id { - if let Some(per_request) = state.requests_info.remove(&id) { - let _ = per_request.received.send(()); - if state.known_collators.get(&origin).is_some() { - let pov = match pov.decompress() { - Ok(pov) => pov, - Err(error) => { - tracing::debug!( - target: LOG_TARGET, - %request_id, - ?error, - "Failed to extract PoV", - ); - return; - } - }; - - let _span = jaeger::pov_span(&pov, "received-collation"); - - tracing::debug!( - target: LOG_TARGET, - %request_id, - ?para_id, - ?relay_parent, - candidate_hash = ?receipt.hash(), - "Received collation", - ); - - let _ = per_request.result.send((receipt.clone(), pov.clone())); - state.metrics.on_request(Ok(())); - } - } - } - } else { - // If this collation is not just a delayed one that we were expecting, - // but our view has moved on, in that case modify peer's reputation. - if !state.recently_removed_heads.contains(&relay_parent) { - modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await; - } - } -} - /// Request a collation from the network. /// This function will /// - Check for duplicate requests. @@ -452,7 +310,7 @@ where } if state.requested_collations.contains_key(&(relay_parent, para_id.clone(), peer_id.clone())) { - tracing::trace!( + tracing::warn!( target: LOG_TARGET, peer_id = %peer_id, %para_id, @@ -462,54 +320,37 @@ where return; } - let request_id = state.next_request_id; - state.next_request_id += 1; - - let (tx, rx) = oneshot::channel(); + let (full_request, response_recv) = + OutgoingRequest::new(Recipient::Peer(peer_id), CollationFetchingRequest { + relay_parent, + para_id, + }); + let requests = Requests::CollationFetching(full_request); let per_request = PerRequest { - received: tx, - result, - }; - - let request = CollationRequest { - received: rx, - timeout: state.request_timeout, - request_id, + from_collator: response_recv.boxed().fuse(), + to_requester: result, span: state.span_per_relay_parent.get(&relay_parent).map(|s| { s.child_builder("collation-request") .with_para_id(para_id) .build() }), + }; - state.requested_collations.insert((relay_parent, para_id.clone(), peer_id.clone()), request_id); - - state.requests_info.insert(request_id, per_request); - - state.requests_in_progress.push(request.wait().boxed()); + state.requested_collations.insert((relay_parent, para_id.clone(), peer_id.clone()), per_request); tracing::debug!( target: LOG_TARGET, peer_id = %peer_id, %para_id, - %request_id, ?relay_parent, "Requesting collation", ); - let wire_message = protocol_v1::CollatorProtocolMessage::RequestCollation( - request_id, - relay_parent, - para_id, - ); - ctx.send_message(AllMessages::NetworkBridge( - NetworkBridgeMessage::SendCollationMessage( - vec![peer_id], - protocol_v1::CollationProtocol::CollatorProtocol(wire_message), - ) - )).await; + NetworkBridgeMessage::SendRequests(vec![requests], IfDisconnected::ImmediateError)) + ).await; } /// Notify `CandidateSelectionSubsystem` that a collation has been advertised. @@ -564,16 +405,12 @@ where ); } } - RequestCollation(_, _, _) => { - // This is a validator side of the protocol, collation requests are not expected here. - modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await; - } - Collation(request_id, receipt, pov) => { - let _span = state.span_per_relay_parent.get(&receipt.descriptor.relay_parent) - .map(|s| s.child("received-collation")); - received_collation(ctx, state, origin, request_id, receipt, pov).await; - } CollationSeconded(_) => { + tracing::warn!( + target: LOG_TARGET, + peer_id = ?origin, + "Unexpected `CollationSeconded` message, decreasing reputation", + ); modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await; } } @@ -587,21 +424,9 @@ async fn remove_relay_parent( state: &mut State, relay_parent: Hash, ) -> Result<()> { - let mut remove_these = Vec::new(); - - state.requested_collations.retain(|k, v| { - if k.0 == relay_parent { - remove_these.push(*v); - } + state.requested_collations.retain(|k, _| { k.0 != relay_parent }); - - for id in remove_these.into_iter() { - if let Some(info) = state.requests_info.remove(&id) { - info.received.send(()).map_err(|_| oneshot::Canceled)?; - } - } - Ok(()) } @@ -628,11 +453,7 @@ async fn handle_our_view_change( .cloned() .collect::>(); - // Update the set of recently removed chain heads. - state.recently_removed_heads.clear(); - for removed in removed.into_iter() { - state.recently_removed_heads.insert(removed.clone()); remove_relay_parent(state, removed).await?; state.span_per_relay_parent.remove(&removed); } @@ -640,30 +461,6 @@ async fn handle_our_view_change( Ok(()) } -/// A request has timed out. -#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))] -async fn request_timed_out( - ctx: &mut Context, - state: &mut State, - id: RequestId, -) -where - Context: SubsystemContext -{ - state.metrics.on_request(Err(())); - - // We have to go backwards in the map, again. - if let Some(key) = find_val_in_map(&state.requested_collations, &id) { - if let Some(_) = state.requested_collations.remove(&key) { - if let Some(_) = state.requests_info.remove(&id) { - let peer_id = key.2; - - modify_reputation(ctx, peer_id, COST_REQUEST_TIMED_OUT).await; - } - } - } -} - /// Bridge event switch. #[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))] async fn handle_network_msg( @@ -753,6 +550,12 @@ where ); } } + CollationFetchingRequest(_) => { + tracing::warn!( + target: LOG_TARGET, + "CollationFetchingRequest message is not expected on the validator side of the protocol", + ); + } } } @@ -760,7 +563,6 @@ where #[tracing::instrument(skip(ctx, metrics), fields(subsystem = LOG_TARGET))] pub(crate) async fn run( mut ctx: Context, - request_timeout: Duration, metrics: Metrics, ) -> Result<()> where @@ -770,7 +572,6 @@ where use OverseerSignal::*; let mut state = State { - request_timeout, metrics, ..Default::default() }; @@ -789,46 +590,166 @@ where continue; } - while let Poll::Ready(Some(request)) = futures::poll!(state.requests_in_progress.next()) { - let _timer = state.metrics.time_handle_collation_request_result(); - - // Request has timed out, we need to penalize the collator and re-send the request - // if the chain has not moved on yet. - match request { - CollationRequestResult::Timeout(id) => { - tracing::debug!(target: LOG_TARGET, request_id=%id, "Collation timed out"); - request_timed_out(&mut ctx, &mut state, id).await; - } - CollationRequestResult::Received(id) => { - state.requests_info.remove(&id); - } + let mut retained_requested = HashSet::new(); + for ((hash, para_id, peer_id), per_req) in state.requested_collations.iter_mut() { + // Despite the await, this won't block: + let finished = poll_collation_response( + &mut ctx, &state.metrics, &state.span_per_relay_parent, + hash, para_id, peer_id, per_req + ).await; + if !finished { + retained_requested.insert((*hash, *para_id, *peer_id)); } } - + state.requested_collations.retain(|k, _| retained_requested.contains(k)); futures::pending!(); } - Ok(()) } -fn find_val_in_map(map: &HashMap, val: &V) -> Option { - map - .iter() - .find_map(|(k, v)| if v == val { Some(k.clone()) } else { None }) +/// Poll collation response, return immediately if there is none. +/// +/// Ready responses are handled, by logging and decreasing peer's reputation on error and by +/// forwarding proper responses to the requester. +/// +/// Returns: `true` if `from_collator` future was ready. +async fn poll_collation_response( + ctx: &mut Context, + metrics: &Metrics, + spans: &HashMap, + hash: &Hash, + para_id: &ParaId, + peer_id: &PeerId, + per_req: &mut PerRequest +) +-> bool +where + Context: SubsystemContext +{ + if never!(per_req.from_collator.is_terminated()) { + tracing::error!( + target: LOG_TARGET, + "We remove pending responses once received, this should not happen." + ); + return true + } + + if let Poll::Ready(response) = futures::poll!(&mut per_req.from_collator) { + let _span = spans.get(&hash) + .map(|s| s.child("received-collation")); + let _timer = metrics.time_handle_collation_request_result(); + + let mut metrics_result = Err(()); + let mut success = "false"; + + match response { + Err(RequestError::InvalidResponse(err)) => { + tracing::warn!( + target: LOG_TARGET, + hash = ?hash, + para_id = ?para_id, + peer_id = ?peer_id, + err = ?err, + "Collator provided response that could not be decoded" + ); + modify_reputation(ctx, *peer_id, COST_CORRUPTED_MESSAGE).await; + } + Err(RequestError::NetworkError(err)) => { + tracing::warn!( + target: LOG_TARGET, + hash = ?hash, + para_id = ?para_id, + peer_id = ?peer_id, + err = ?err, + "Fetching collation failed due to network error" + ); + // A minor decrease in reputation for any network failure seems + // sensbile. In theory this could be exploited, by DoSing this node, + // which would result in reduced reputation for proper nodes, but the + // same can happen for penalities on timeouts, which we also have. + modify_reputation(ctx, *peer_id, COST_NETWORK_ERROR).await; + } + Err(RequestError::Canceled(_)) => { + tracing::warn!( + target: LOG_TARGET, + hash = ?hash, + para_id = ?para_id, + peer_id = ?peer_id, + "Request timed out" + ); + // A minor decrease in reputation for any network failure seems + // sensbile. In theory this could be exploited, by DoSing this node, + // which would result in reduced reputation for proper nodes, but the + // same can happen for penalities on timeouts, which we also have. + modify_reputation(ctx, *peer_id, COST_REQUEST_TIMED_OUT).await; + } + Ok(CollationFetchingResponse::Collation(receipt, compressed_pov)) => { + match compressed_pov.decompress() { + Ok(pov) => { + tracing::debug!( + target: LOG_TARGET, + para_id = ?para_id, + hash = ?hash, + candidate_hash = ?receipt.hash(), + "Received collation", + ); + + // Actual sending: + let _span = jaeger::pov_span(&pov, "received-collation"); + let (mut tx, _) = oneshot::channel(); + std::mem::swap(&mut tx, &mut (per_req.to_requester)); + let result = tx.send((receipt, pov)); + + if let Err(_) = result { + tracing::warn!( + target: LOG_TARGET, + hash = ?hash, + para_id = ?para_id, + peer_id = ?peer_id, + "Sending response back to requester failed (receiving side closed)" + ); + } else { + metrics_result = Ok(()); + success = "true"; + } + + } + Err(error) => { + tracing::warn!( + target: LOG_TARGET, + hash = ?hash, + para_id = ?para_id, + peer_id = ?peer_id, + ?error, + "Failed to extract PoV", + ); + modify_reputation(ctx, *peer_id, COST_CORRUPTED_MESSAGE).await; + } + }; + } + }; + metrics.on_request(metrics_result); + per_req.span.as_mut().map(|s| s.add_string_tag("success", success)); + true + } else { + false + } } #[cfg(test)] mod tests { use super::*; - use std::iter; + use std::{iter, time::Duration}; use futures::{executor, future, Future}; - use sp_core::crypto::Pair; + use polkadot_node_subsystem_util::TimeoutExt; + use sp_core::{crypto::Pair, Encode}; use assert_matches::assert_matches; - use futures_timer::Delay; - use polkadot_primitives::v1::{BlockData, CollatorPair}; + use polkadot_primitives::v1::{BlockData, CollatorPair, CompressedPoV}; use polkadot_subsystem_testhelpers as test_helpers; - use polkadot_node_network_protocol::our_view; + use polkadot_node_network_protocol::{our_view, + request_response::Requests + }; #[derive(Clone)] struct TestState { @@ -878,7 +799,7 @@ mod tests { let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone()); - let subsystem = run(context, Duration::from_millis(50), Metrics::default()); + let subsystem = run(context, Metrics::default()); let test_fut = test(TestHarness { virtual_overseer }); @@ -986,125 +907,6 @@ mod tests { }); } - // Test that an issued request times out a number of times until our view moves on. - #[test] - fn collation_request_times_out() { - let test_state = TestState::default(); - - test_harness(|test_harness| async move { - let TestHarness { - mut virtual_overseer, - } = test_harness; - - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::OurViewChange(our_view![test_state.relay_parent]) - ) - ).await; - - let peer_b = PeerId::random(); - - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage( - peer_b.clone(), - protocol_v1::CollatorProtocolMessage::Declare( - test_state.collators[0].public(), - ), - ) - ) - ).await; - - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage( - peer_b.clone(), - protocol_v1::CollatorProtocolMessage::AdvertiseCollation( - test_state.relay_parent, - test_state.chain_ids[0], - ) - ) - ) - ).await; - - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::CandidateSelection(CandidateSelectionMessage::Collation( - relay_parent, - para_id, - collator, - )) => { - assert_eq!(relay_parent, test_state.relay_parent); - assert_eq!(para_id, test_state.chain_ids[0]); - assert_eq!(collator, test_state.collators[0].public()); - } - ); - - let (tx, _rx) = oneshot::channel(); - - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::FetchCollation( - test_state.relay_parent, - test_state.collators[0].public(), - test_state.chain_ids[0], - tx, - ) - ).await; - - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge(NetworkBridgeMessage::SendCollationMessage( - peers, - protocol_v1::CollationProtocol::CollatorProtocol( - protocol_v1::CollatorProtocolMessage::RequestCollation( - _id, - relay_parent, - para_id, - ) - ) - ) - ) => { - assert_eq!(relay_parent, test_state.relay_parent); - assert_eq!(peers, vec![peer_b.clone()]); - assert_eq!(para_id, test_state.chain_ids[0]); - }); - - // Don't send a response and we shoud see reputation penalties to the - // collator. - Delay::new(Duration::from_millis(50)).await; - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ReportPeer(peer, rep) - ) => { - assert_eq!(peer, peer_b); - assert_eq!(rep, COST_REQUEST_TIMED_OUT); - } - ); - - // Deactivate the relay parent in question. - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::OurViewChange(our_view![Hash::repeat_byte(0x42)]) - ) - ).await; - - // After we've deactivated it we are not expecting any more requests - // for timed out collations. - assert!( - overseer_recv_with_timeout( - &mut virtual_overseer, - Duration::from_secs(1), - ).await.is_none() - ); - }); - } - // Test that other subsystems may modify collators' reputations. #[test] fn collator_reporting_works() { @@ -1309,81 +1111,64 @@ mod tests { ) ).await; - let (request_id, peer_id) = assert_matches!( + let response_channel = assert_matches!( overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge(NetworkBridgeMessage::SendCollationMessage( - peers, - protocol_v1::CollationProtocol::CollatorProtocol( - protocol_v1::CollatorProtocolMessage::RequestCollation( - id, - relay_parent, - para_id, - ) - ) - ) + AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(reqs, IfDisconnected::ImmediateError) ) => { - assert_eq!(relay_parent, test_state.relay_parent); - assert_eq!(para_id, test_state.chain_ids[0]); - (id, peers[0].clone()) + let req = reqs.into_iter().next() + .expect("There should be exactly one request"); + match req { + Requests::CollationFetching(req) => { + let payload = req.payload; + assert_eq!(payload.relay_parent, test_state.relay_parent); + assert_eq!(payload.para_id, test_state.chain_ids[0]); + req.pending_response + } + _ => panic!("Unexpected request"), + } }); let mut candidate_a = CandidateReceipt::default(); candidate_a.descriptor.para_id = test_state.chain_ids[0]; candidate_a.descriptor.relay_parent = test_state.relay_parent; + response_channel.send(Ok( + CollationFetchingResponse::Collation( + candidate_a.clone(), + CompressedPoV::compress(&PoV { + block_data: BlockData(vec![]), + }).unwrap(), + ).encode() + )).expect("Sending response should succeed"); - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage( - peer_id, - protocol_v1::CollatorProtocolMessage::Collation( - request_id, - candidate_a.clone(), - protocol_v1::CompressedPoV::compress(&PoV { - block_data: BlockData(vec![]), - }).unwrap(), - ) - ) - ) - ).await; - - let (request_id, peer_id) = assert_matches!( + let response_channel = assert_matches!( overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge(NetworkBridgeMessage::SendCollationMessage( - peers, - protocol_v1::CollationProtocol::CollatorProtocol( - protocol_v1::CollatorProtocolMessage::RequestCollation( - id, - relay_parent, - para_id, - ) - ) - ) + AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(reqs, IfDisconnected::ImmediateError) ) => { - assert_eq!(relay_parent, test_state.relay_parent); - assert_eq!(para_id, test_state.chain_ids[0]); - (id, peers[0].clone()) + let req = reqs.into_iter().next() + .expect("There should be exactly one request"); + match req { + Requests::CollationFetching(req) => { + let payload = req.payload; + assert_eq!(payload.relay_parent, test_state.relay_parent); + assert_eq!(payload.para_id, test_state.chain_ids[0]); + req.pending_response + } + _ => panic!("Unexpected request"), + } }); let mut candidate_b = CandidateReceipt::default(); candidate_b.descriptor.para_id = test_state.chain_ids[0]; candidate_b.descriptor.relay_parent = test_state.relay_parent; - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage( - peer_id, - protocol_v1::CollatorProtocolMessage::Collation( - request_id, - candidate_b.clone(), - protocol_v1::CompressedPoV::compress(&PoV { - block_data: BlockData(vec![1, 2, 3]), - }).unwrap(), - ) - ) - ) - ).await; + response_channel.send(Ok( + CollationFetchingResponse::Collation( + candidate_b.clone(), + CompressedPoV::compress(&PoV { + block_data: BlockData(vec![1, 2, 3]), + }).unwrap(), + ).encode() + )).expect("Sending response should succeed"); let collation_0 = rx_0.await.unwrap(); let collation_1 = rx_1.await.unwrap(); diff --git a/polkadot/node/network/pov-distribution/src/lib.rs b/polkadot/node/network/pov-distribution/src/lib.rs index e7a86bd507..585cdcf9e8 100644 --- a/polkadot/node/network/pov-distribution/src/lib.rs +++ b/polkadot/node/network/pov-distribution/src/lib.rs @@ -22,9 +22,7 @@ #![deny(unused_crate_dependencies)] #![warn(missing_docs)] -use polkadot_primitives::v1::{ - Hash, PoV, CandidateDescriptor, ValidatorId, Id as ParaId, CoreIndex, CoreState, -}; +use polkadot_primitives::v1::{CandidateDescriptor, CompressedPoV, CoreIndex, CoreState, Hash, Id as ParaId, PoV, ValidatorId}; use polkadot_subsystem::{ ActiveLeavesUpdate, OverseerSignal, SubsystemContext, SubsystemResult, SubsystemError, Subsystem, FromOverseer, SpawnedSubsystem, @@ -107,7 +105,7 @@ struct State { } struct BlockBasedState { - known: HashMap, protocol_v1::CompressedPoV)>, + known: HashMap, CompressedPoV)>, /// All the PoVs we are or were fetching, coupled with channels expecting the data. /// @@ -135,7 +133,7 @@ fn awaiting_message(relay_parent: Hash, awaiting: Vec) fn send_pov_message( relay_parent: Hash, pov_hash: Hash, - pov: &protocol_v1::CompressedPoV, + pov: &CompressedPoV, ) -> protocol_v1::ValidationProtocol { protocol_v1::ValidationProtocol::PoVDistribution( protocol_v1::PoVDistributionMessage::SendPoV(relay_parent, pov_hash, pov.clone()) @@ -274,7 +272,7 @@ async fn distribute_to_awaiting( metrics: &Metrics, relay_parent: Hash, pov_hash: Hash, - pov: &protocol_v1::CompressedPoV, + pov: &CompressedPoV, ) { // Send to all peers who are awaiting the PoV and have that relay-parent in their view. // @@ -487,7 +485,7 @@ async fn handle_distribute( } } - let encoded_pov = match protocol_v1::CompressedPoV::compress(&*pov) { + let encoded_pov = match CompressedPoV::compress(&*pov) { Ok(pov) => pov, Err(error) => { tracing::debug!( @@ -583,7 +581,7 @@ async fn handle_incoming_pov( peer: PeerId, relay_parent: Hash, pov_hash: Hash, - encoded_pov: protocol_v1::CompressedPoV, + encoded_pov: CompressedPoV, ) { let relay_parent_state = match state.relay_parent_state.get_mut(&relay_parent) { None => { diff --git a/polkadot/node/network/pov-distribution/src/tests.rs b/polkadot/node/network/pov-distribution/src/tests.rs index d8ceab5537..6d07eac6b5 100644 --- a/polkadot/node/network/pov-distribution/src/tests.rs +++ b/polkadot/node/network/pov-distribution/src/tests.rs @@ -24,10 +24,7 @@ use tracing::trace; use sp_keyring::Sr25519Keyring; -use polkadot_primitives::v1::{ - AuthorityDiscoveryId, BlockData, CoreState, GroupRotationInfo, Id as ParaId, - ScheduledCore, ValidatorIndex, SessionIndex, SessionInfo, -}; +use polkadot_primitives::v1::{AuthorityDiscoveryId, BlockData, CoreState, GroupRotationInfo, Id as ParaId, ScheduledCore, SessionIndex, SessionInfo, ValidatorIndex}; use polkadot_subsystem::{messages::{RuntimeApiMessage, RuntimeApiRequest}, jaeger}; use polkadot_node_subsystem_test_helpers as test_helpers; use polkadot_node_subsystem_util::TimeoutExt; @@ -401,7 +398,7 @@ fn ask_validators_for_povs() { protocol_v1::PoVDistributionMessage::SendPoV( current, pov_hash, - protocol_v1::CompressedPoV::compress(&pov_block).unwrap(), + CompressedPoV::compress(&pov_block).unwrap(), ), ) ) @@ -647,7 +644,7 @@ fn distributes_to_those_awaiting_and_completes_local() { assert_eq!(peers, vec![peer_a.clone()]); assert_eq!( message, - send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()), + send_pov_message(hash_a, pov_hash, &CompressedPoV::compress(&pov).unwrap()), ); } ) @@ -960,7 +957,7 @@ fn peer_complete_fetch_and_is_rewarded() { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), - send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()), + send_pov_message(hash_a, pov_hash, &CompressedPoV::compress(&pov).unwrap()), ).focus().unwrap(), ).await; @@ -969,7 +966,7 @@ fn peer_complete_fetch_and_is_rewarded() { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_b.clone(), - send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()), + send_pov_message(hash_a, pov_hash, &CompressedPoV::compress(&pov).unwrap()), ).focus().unwrap(), ).await; @@ -1050,7 +1047,7 @@ fn peer_punished_for_sending_bad_pov() { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), - send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&bad_pov).unwrap()), + send_pov_message(hash_a, pov_hash, &CompressedPoV::compress(&bad_pov).unwrap()), ).focus().unwrap(), ).await; @@ -1115,7 +1112,7 @@ fn peer_punished_for_sending_unexpected_pov() { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), - send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()), + send_pov_message(hash_a, pov_hash, &CompressedPoV::compress(&pov).unwrap()), ).focus().unwrap(), ).await; @@ -1178,7 +1175,7 @@ fn peer_punished_for_sending_pov_out_of_our_view() { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), - send_pov_message(hash_b, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()), + send_pov_message(hash_b, pov_hash, &CompressedPoV::compress(&pov).unwrap()), ).focus().unwrap(), ).await; @@ -1467,7 +1464,7 @@ fn peer_complete_fetch_leads_to_us_completing_others() { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), - send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()), + send_pov_message(hash_a, pov_hash, &CompressedPoV::compress(&pov).unwrap()), ).focus().unwrap(), ).await; @@ -1491,7 +1488,7 @@ fn peer_complete_fetch_leads_to_us_completing_others() { assert_eq!(peers, vec![peer_b.clone()]); assert_eq!( message, - send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()), + send_pov_message(hash_a, pov_hash, &CompressedPoV::compress(&pov).unwrap()), ); } ); @@ -1551,7 +1548,7 @@ fn peer_completing_request_no_longer_awaiting() { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), - send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()), + send_pov_message(hash_a, pov_hash, &CompressedPoV::compress(&pov).unwrap()), ).focus().unwrap(), ).await; diff --git a/polkadot/node/network/protocol/Cargo.toml b/polkadot/node/network/protocol/Cargo.toml index 6c56cbc298..f7a0c72b2a 100644 --- a/polkadot/node/network/protocol/Cargo.toml +++ b/polkadot/node/network/protocol/Cargo.toml @@ -12,8 +12,4 @@ polkadot-node-jaeger = { path = "../../jaeger" } parity-scale-codec = { version = "2.0.0", default-features = false, features = ["derive"] } sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } strum = { version = "0.20", features = ["derive"] } -thiserror = "1.0.23" futures = "0.3.12" - -[target.'cfg(not(target_os = "unknown"))'.dependencies] -zstd = "0.5.0" diff --git a/polkadot/node/network/protocol/src/lib.rs b/polkadot/node/network/protocol/src/lib.rs index 5d0702c524..5f89173a58 100644 --- a/polkadot/node/network/protocol/src/lib.rs +++ b/polkadot/node/network/protocol/src/lib.rs @@ -288,10 +288,7 @@ impl View { /// v1 protocol types. pub mod v1 { - use polkadot_primitives::v1::{ - Hash, CollatorId, Id as ParaId, ErasureChunk, CandidateReceipt, - SignedAvailabilityBitfield, PoV, CandidateHash, ValidatorIndex, CandidateIndex, AvailableData, - }; + use polkadot_primitives::v1::{AvailableData, CandidateHash, CandidateIndex, CollatorId, CompressedPoV, ErasureChunk, Hash, Id as ParaId, SignedAvailabilityBitfield, ValidatorIndex}; use polkadot_node_primitives::{ SignedFullStatement, approval::{IndirectAssignmentCert, IndirectSignedApprovalVote}, @@ -357,73 +354,6 @@ pub mod v1 { Approvals(Vec), } - #[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)] - #[allow(missing_docs)] - pub enum CompressedPoVError { - #[error("Failed to compress a PoV")] - Compress, - #[error("Failed to decompress a PoV")] - Decompress, - #[error("Failed to decode the uncompressed PoV")] - Decode, - #[error("Architecture is not supported")] - NotSupported, - } - - /// SCALE and Zstd encoded [`PoV`]. - #[derive(Clone, Encode, Decode, PartialEq, Eq)] - pub struct CompressedPoV(Vec); - - impl CompressedPoV { - /// Compress the given [`PoV`] and returns a [`CompressedPoV`]. - #[cfg(not(target_os = "unknown"))] - pub fn compress(pov: &PoV) -> Result { - zstd::encode_all(pov.encode().as_slice(), 3).map_err(|_| CompressedPoVError::Compress).map(Self) - } - - /// Compress the given [`PoV`] and returns a [`CompressedPoV`]. - #[cfg(target_os = "unknown")] - pub fn compress(_: &PoV) -> Result { - Err(CompressedPoVError::NotSupported) - } - - /// Decompress `self` and returns the [`PoV`] on success. - #[cfg(not(target_os = "unknown"))] - pub fn decompress(&self) -> Result { - use std::io::Read; - const MAX_POV_BLOCK_SIZE: usize = 32 * 1024 * 1024; - - struct InputDecoder<'a, T: std::io::BufRead>(&'a mut zstd::Decoder, usize); - impl<'a, T: std::io::BufRead> parity_scale_codec::Input for InputDecoder<'a, T> { - fn read(&mut self, into: &mut [u8]) -> Result<(), parity_scale_codec::Error> { - self.1 = self.1.saturating_add(into.len()); - if self.1 > MAX_POV_BLOCK_SIZE { - return Err("pov block too big".into()) - } - self.0.read_exact(into).map_err(Into::into) - } - fn remaining_len(&mut self) -> Result, parity_scale_codec::Error> { - Ok(None) - } - } - - let mut decoder = zstd::Decoder::new(self.0.as_slice()).map_err(|_| CompressedPoVError::Decompress)?; - PoV::decode(&mut InputDecoder(&mut decoder, 0)).map_err(|_| CompressedPoVError::Decode) - } - - /// Decompress `self` and returns the [`PoV`] on success. - #[cfg(target_os = "unknown")] - pub fn decompress(&self) -> Result { - Err(CompressedPoVError::NotSupported) - } - } - - impl std::fmt::Debug for CompressedPoV { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "CompressedPoV({} bytes)", self.0.len()) - } - } - /// Network messages used by the collator protocol subsystem #[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)] pub enum CollatorProtocolMessage { @@ -434,12 +364,6 @@ pub mod v1 { /// that they are a collator with given ID. #[codec(index = 1)] AdvertiseCollation(Hash, ParaId), - /// Request the advertised collation at that relay-parent. - #[codec(index = 2)] - RequestCollation(RequestId, Hash, ParaId), - /// A requested collation. - #[codec(index = 3)] - Collation(RequestId, CandidateReceipt, CompressedPoV), /// A collation sent to a validator was seconded. #[codec(index = 4)] CollationSeconded(SignedFullStatement), @@ -481,17 +405,3 @@ pub mod v1 { impl_try_from!(CollationProtocol, CollatorProtocol, CollatorProtocolMessage); } - -#[cfg(test)] -mod tests { - use polkadot_primitives::v1::PoV; - use super::v1::{CompressedPoV, CompressedPoVError}; - - #[test] - fn decompress_huge_pov_block_fails() { - let pov = PoV { block_data: vec![0; 63 * 1024 * 1024].into() }; - - let compressed = CompressedPoV::compress(&pov).unwrap(); - assert_eq!(CompressedPoVError::Decode, compressed.decompress().unwrap_err()); - } -} diff --git a/polkadot/node/network/protocol/src/request_response.rs b/polkadot/node/network/protocol/src/request_response/mod.rs similarity index 79% rename from polkadot/node/network/protocol/src/request_response.rs rename to polkadot/node/network/protocol/src/request_response/mod.rs index 75eb33cfab..53d12926c4 100644 --- a/polkadot/node/network/protocol/src/request_response.rs +++ b/polkadot/node/network/protocol/src/request_response/mod.rs @@ -43,7 +43,7 @@ pub use sc_network::config::RequestResponseConfig; /// All requests that can be sent to the network bridge. pub mod request; -pub use request::{IncomingRequest, OutgoingRequest, Requests}; +pub use request::{IncomingRequest, OutgoingRequest, Requests, Recipient, OutgoingResult}; ///// Multiplexer for incoming requests. // pub mod multiplexer; @@ -57,6 +57,8 @@ pub mod v1; pub enum Protocol { /// Protocol for availability fetching, used by availability distribution. AvailabilityFetching, + /// Protocol for fetching collations from collators. + CollationFetching, } /// Default request timeout in seconds. @@ -66,6 +68,10 @@ pub enum Protocol { /// sets. const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(3); +/// Request timeout where we can assume the connection is already open (e.g. we have peers in a +/// peer set as well). +const DEFAULT_REQUEST_TIMEOUT_CONNECTED: Duration = Duration::from_secs(1); + impl Protocol { /// Get a configuration for a given Request response protocol. /// @@ -85,14 +91,22 @@ impl Protocol { let cfg = match self { Protocol::AvailabilityFetching => RequestResponseConfig { name: p_name, - // Arbitrary very conservative numbers: - // TODO: Get better numbers, see https://github.com/paritytech/polkadot/issues/2370 - max_request_size: 10_000, - max_response_size: 1_000_000, - // Also just some relative conservative guess: + max_request_size: 1_000, + max_response_size: 100_000, request_timeout: DEFAULT_REQUEST_TIMEOUT, inbound_queue: Some(tx), }, + Protocol::CollationFetching => RequestResponseConfig { + name: p_name, + max_request_size: 1_000, + /// Collations are expected to be around 10Meg, probably much smaller with + /// compression. So 10Meg should be sufficient, we might be able to reduce this + /// further. + max_response_size: 10_000_000, + // Taken from initial implementation in collator protocol: + request_timeout: DEFAULT_REQUEST_TIMEOUT_CONNECTED, + inbound_queue: Some(tx), + }, }; (rx, cfg) } @@ -106,6 +120,8 @@ impl Protocol { // assuming we can service requests relatively quickly, which would need to be measured // as well. Protocol::AvailabilityFetching => 100, + // 10 seems reasonable, considering group sizes of max 10 validators. + Protocol::CollationFetching => 10, } } @@ -118,6 +134,7 @@ impl Protocol { pub const fn get_protocol_name_static(self) -> &'static str { match self { Protocol::AvailabilityFetching => "/polkadot/req_availability/1", + Protocol::CollationFetching => "/polkadot/req_collation/1", } } } diff --git a/polkadot/node/network/protocol/src/request_response/request.rs b/polkadot/node/network/protocol/src/request_response/request.rs index 2f08651017..da0d4cd53b 100644 --- a/polkadot/node/network/protocol/src/request_response/request.rs +++ b/polkadot/node/network/protocol/src/request_response/request.rs @@ -40,6 +40,8 @@ pub trait IsRequest { pub enum Requests { /// Request an availability chunk from a node. AvailabilityFetching(OutgoingRequest), + /// Fetch a collation from a collator which previously announced it. + CollationFetching(OutgoingRequest), } impl Requests { @@ -47,6 +49,7 @@ impl Requests { pub fn get_protocol(&self) -> Protocol { match self { Self::AvailabilityFetching(_) => Protocol::AvailabilityFetching, + Self::CollationFetching(_) => Protocol::CollationFetching, } } @@ -60,10 +63,20 @@ impl Requests { pub fn encode_request(self) -> (Protocol, OutgoingRequest>) { match self { Self::AvailabilityFetching(r) => r.encode_request(), + Self::CollationFetching(r) => r.encode_request(), } } } +/// Potential recipients of an outgoing request. +#[derive(Debug, Eq, Hash, PartialEq)] +pub enum Recipient { + /// Recipient is a regular peer and we know its peer id. + Peer(PeerId), + /// Recipient is a validator, we address it via this `AuthorityDiscoveryId`. + Authority(AuthorityDiscoveryId), +} + /// A request to be sent to the network bridge, including a sender for sending responses/failures. /// /// The network implementation will make use of that sender for informing the requesting subsystem @@ -71,7 +84,7 @@ impl Requests { #[derive(Debug)] pub struct OutgoingRequest { /// Intendent recipient of this request. - pub peer: AuthorityDiscoveryId, + pub peer: Recipient, /// The actual request to send over the wire. pub payload: Req, /// Sender which is used by networking to get us back a response. @@ -90,6 +103,9 @@ pub enum RequestError { Canceled(oneshot::Canceled), } +/// Responses received for an `OutgoingRequest`. +pub type OutgoingResult = Result; + impl OutgoingRequest where Req: IsRequest + Encode, @@ -100,11 +116,11 @@ where /// It will contain a sender that is used by the networking for sending back responses. The /// connected receiver is returned as the second element in the returned tuple. pub fn new( - peer: AuthorityDiscoveryId, + peer: Recipient, payload: Req, ) -> ( Self, - impl Future>, + impl Future>, ) { let (tx, rx) = oneshot::channel(); let r = Self { @@ -201,7 +217,7 @@ where /// Future for actually receiving a typed response for an OutgoingRequest. async fn receive_response( rec: oneshot::Receiver, network::RequestFailure>>, -) -> Result +) -> OutgoingResult where Req: IsRequest, Req::Response: Decode, diff --git a/polkadot/node/network/protocol/src/request_response/v1.rs b/polkadot/node/network/protocol/src/request_response/v1.rs index 4f8c968b8f..269b55424b 100644 --- a/polkadot/node/network/protocol/src/request_response/v1.rs +++ b/polkadot/node/network/protocol/src/request_response/v1.rs @@ -18,7 +18,8 @@ use parity_scale_codec::{Decode, Encode}; -use polkadot_primitives::v1::{CandidateHash, ErasureChunk, ValidatorIndex}; +use polkadot_primitives::v1::{CandidateHash, CandidateReceipt, ErasureChunk, ValidatorIndex, CompressedPoV, Hash}; +use polkadot_primitives::v1::Id as ParaId; use super::request::IsRequest; use super::Protocol; @@ -78,3 +79,25 @@ impl IsRequest for AvailabilityFetchingRequest { type Response = AvailabilityFetchingResponse; const PROTOCOL: Protocol = Protocol::AvailabilityFetching; } + +/// Request the advertised collation at that relay-parent. +#[derive(Debug, Clone, Encode, Decode)] +pub struct CollationFetchingRequest { + /// Relay parent we want a collation for. + pub relay_parent: Hash, + /// The `ParaId` of the collation. + pub para_id: ParaId, +} + +/// Responses as sent by collators. +#[derive(Debug, Clone, Encode, Decode)] +pub enum CollationFetchingResponse { + /// Deliver requested collation. + #[codec(index = 0)] + Collation(CandidateReceipt, CompressedPoV), +} + +impl IsRequest for CollationFetchingRequest { + type Response = CollationFetchingResponse; + const PROTOCOL: Protocol = Protocol::CollationFetching; +} diff --git a/polkadot/node/subsystem/src/messages.rs b/polkadot/node/subsystem/src/messages.rs index 7800b46317..0df612bd37 100644 --- a/polkadot/node/subsystem/src/messages.rs +++ b/polkadot/node/subsystem/src/messages.rs @@ -24,6 +24,9 @@ use futures::channel::{mpsc, oneshot}; use thiserror::Error; + +pub use sc_network::IfDisconnected; + use polkadot_node_network_protocol::{ peer_set::PeerSet, v1 as protocol_v1, UnifiedReputationChange, PeerId, request_response::{Requests, request::IncomingRequest, v1 as req_res_v1}, @@ -198,21 +201,8 @@ pub enum CollatorProtocolMessage { /// Get a network bridge update. #[from] NetworkBridgeUpdateV1(NetworkBridgeEvent), -} - -impl CollatorProtocolMessage { - /// If the current variant contains the relay parent hash, return it. - pub fn relay_parent(&self) -> Option { - match self { - Self::CollateOn(_) => None, - Self::DistributeCollation(receipt, _, _) => Some(receipt.descriptor().relay_parent), - Self::FetchCollation(relay_parent, _, _, _) => Some(*relay_parent), - Self::ReportCollator(_) => None, - Self::NoteGoodCollation(_) => None, - Self::NetworkBridgeUpdateV1(_) => None, - Self::NotifyCollationSeconded(_, _) => None, - } - } + /// Incoming network request for a collation. + CollationFetchingRequest(IncomingRequest) } /// Messages received by the network bridge subsystem. @@ -234,7 +224,8 @@ pub enum NetworkBridgeMessage { SendCollationMessages(Vec<(Vec, protocol_v1::CollationProtocol)>), /// Send requests via substrate request/response. - SendRequests(Vec), + /// Second parameter, tells what to do if we are not yet connected to the peer. + SendRequests(Vec, IfDisconnected), /// Connect to peers who represent the given `validator_ids`. /// @@ -750,3 +741,13 @@ impl From> for AllMessa From::::from(From::from(req)) } } +impl From> for AllMessages { + fn from(req: IncomingRequest) -> Self { + From::::from(From::from(req)) + } +} +impl From> for CollatorProtocolMessage { + fn from(req: IncomingRequest) -> Self { + Self::CollationFetchingRequest(req) + } +} diff --git a/polkadot/primitives/Cargo.toml b/polkadot/primitives/Cargo.toml index 8f12f6dcaa..0c887ece50 100644 --- a/polkadot/primitives/Cargo.toml +++ b/polkadot/primitives/Cargo.toml @@ -26,6 +26,10 @@ bitvec = { version = "0.20.1", default-features = false, features = ["alloc"] } frame-system = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } hex-literal = "0.3.1" parity-util-mem = { version = "0.9.0", default-features = false, optional = true } +thiserror = "1.0.23" + +[target.'cfg(not(target_os = "unknown"))'.dependencies] +zstd = "0.5.0" [dev-dependencies] sp-serializer = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/polkadot/primitives/src/v1.rs b/polkadot/primitives/src/v1.rs index c3d171338b..8aa4ee8b24 100644 --- a/polkadot/primitives/src/v1.rs +++ b/polkadot/primitives/src/v1.rs @@ -454,6 +454,76 @@ impl PoV { } } +/// SCALE and Zstd encoded [`PoV`]. +#[derive(Clone, Encode, Decode, PartialEq, Eq)] +pub struct CompressedPoV(Vec); + +#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)] +#[cfg(feature = "std")] +#[allow(missing_docs)] +pub enum CompressedPoVError { + #[error("Failed to compress a PoV")] + Compress, + #[error("Failed to decompress a PoV")] + Decompress, + #[error("Failed to decode the uncompressed PoV")] + Decode, + #[error("Architecture is not supported")] + NotSupported, +} + +#[cfg(feature = "std")] +impl CompressedPoV { + /// Compress the given [`PoV`] and returns a [`CompressedPoV`]. + #[cfg(not(target_os = "unknown"))] + pub fn compress(pov: &PoV) -> Result { + zstd::encode_all(pov.encode().as_slice(), 3).map_err(|_| CompressedPoVError::Compress).map(Self) + } + + /// Compress the given [`PoV`] and returns a [`CompressedPoV`]. + #[cfg(target_os = "unknown")] + pub fn compress(_: &PoV) -> Result { + Err(CompressedPoVError::NotSupported) + } + + /// Decompress `self` and returns the [`PoV`] on success. + #[cfg(not(target_os = "unknown"))] + pub fn decompress(&self) -> Result { + use std::io::Read; + const MAX_POV_BLOCK_SIZE: usize = 32 * 1024 * 1024; + + struct InputDecoder<'a, T: std::io::BufRead>(&'a mut zstd::Decoder, usize); + impl<'a, T: std::io::BufRead> parity_scale_codec::Input for InputDecoder<'a, T> { + fn read(&mut self, into: &mut [u8]) -> Result<(), parity_scale_codec::Error> { + self.1 = self.1.saturating_add(into.len()); + if self.1 > MAX_POV_BLOCK_SIZE { + return Err("pov block too big".into()) + } + self.0.read_exact(into).map_err(Into::into) + } + fn remaining_len(&mut self) -> Result, parity_scale_codec::Error> { + Ok(None) + } + } + + let mut decoder = zstd::Decoder::new(self.0.as_slice()).map_err(|_| CompressedPoVError::Decompress)?; + PoV::decode(&mut InputDecoder(&mut decoder, 0)).map_err(|_| CompressedPoVError::Decode) + } + + /// Decompress `self` and returns the [`PoV`] on success. + #[cfg(target_os = "unknown")] + pub fn decompress(&self) -> Result { + Err(CompressedPoVError::NotSupported) + } +} + +#[cfg(feature = "std")] +impl std::fmt::Debug for CompressedPoV { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "CompressedPoV({} bytes)", self.0.len()) + } +} + /// A bitfield concerning availability of backed candidates. #[derive(PartialEq, Eq, Clone, Encode, Decode, RuntimeDebug)] pub struct AvailabilityBitfield(pub BitVec); @@ -659,7 +729,7 @@ impl GroupRotationInfo { #[derive(Clone, Encode, Decode)] #[cfg_attr(feature = "std", derive(Debug, PartialEq, MallocSizeOf))] pub struct OccupiedCore { - // NOTE: this has no ParaId as it can be deduced from the candidate descriptor. + // NOTE: this has no ParaId as it can be deduced from the candidate descriptor. /// If this core is freed by availability, this is the assignment that is next up on this /// core, if any. None if there is nothing queued for this core. @@ -982,6 +1052,7 @@ pub struct AbridgedHrmpChannel { #[cfg(test)] mod tests { use super::*; + use super::{CompressedPoV, CompressedPoVError, PoV}; #[test] fn group_rotation_info_calculations() { @@ -1008,4 +1079,14 @@ mod tests { &Hash::repeat_byte(3), ); } + + + #[cfg(not(target_os = "unknown"))] + #[test] + fn decompress_huge_pov_block_fails() { + let pov = PoV { block_data: vec![0; 63 * 1024 * 1024].into() }; + + let compressed = CompressedPoV::compress(&pov).unwrap(); + assert_eq!(CompressedPoVError::Decode, compressed.decompress().unwrap_err()); + } } diff --git a/polkadot/roadmap/implementers-guide/src/node/availability/availability-distribution.md b/polkadot/roadmap/implementers-guide/src/node/availability/availability-distribution.md index 87400dd87e..757f258f37 100644 --- a/polkadot/roadmap/implementers-guide/src/node/availability/availability-distribution.md +++ b/polkadot/roadmap/implementers-guide/src/node/availability/availability-distribution.md @@ -20,7 +20,7 @@ Input: Output: -- NetworkBridgeMessage::SendRequests(`[Requests]`) +- NetworkBridgeMessage::SendRequests(`[Requests]`, IfDisconnected::TryConnect) - AvailabilityStore::QueryChunk(candidate_hash, index, response_channel) - AvailabilityStore::StoreChunk(candidate_hash, chunk) - RuntimeApiRequest::SessionIndexForChild diff --git a/polkadot/roadmap/implementers-guide/src/node/collators/collator-protocol.md b/polkadot/roadmap/implementers-guide/src/node/collators/collator-protocol.md index 1a8392aaa7..e0968e7fd0 100644 --- a/polkadot/roadmap/implementers-guide/src/node/collators/collator-protocol.md +++ b/polkadot/roadmap/implementers-guide/src/node/collators/collator-protocol.md @@ -100,14 +100,13 @@ digraph G { } ``` -When peers connect to us, they can `Declare` that they represent a collator with given public key. Once they've declared that, they can begin to send advertisements of collations. The peers should not send us any advertisements for collations that are on a relay-parent outside of our view. +When peers connect to us, they can `Declare` that they represent a collator with given public key. Once they've declared that, and we checked their signature, they can begin to send advertisements of collations. The peers should not send us any advertisements for collations that are on a relay-parent outside of our view. The protocol tracks advertisements received and the source of the advertisement. The advertisement source is the `PeerId` of the peer who sent the message. We accept one advertisement per collator per source per relay-parent. +As a validator, we will handle requests from other subsystems to fetch a collation on a specific `ParaId` and relay-parent. These requests are made with the request response protocol `CollationFetchingRequest` request. To do so, we need to first check if we have already gathered a collation on that `ParaId` and relay-parent. If not, we need to select one of the advertisements and issue a request for it. If we've already issued a request, we shouldn't issue another one until the first has returned. -As a validator, we will handle requests from other subsystems to fetch a collation on a specific `ParaId` and relay-parent. These requests are made with the [`CollatorProtocolMessage`][CPM]`::FetchCollation`. To do so, we need to first check if we have already gathered a collation on that `ParaId` and relay-parent. If not, we need to select one of the advertisements and issue a request for it. If we've already issued a request, we shouldn't issue another one until the first has returned. - -When acting on an advertisement, we issue a `WireMessage::RequestCollation`. If the request times out, we need to note the collator as being unreliable and reduce its priority relative to other collators. And then make another request - repeat until we get a response or the chain has moved on. +When acting on an advertisement, we issue a `Requests::CollationFetching`. If the request times out, we need to note the collator as being unreliable and reduce its priority relative to other collators. As a validator, once the collation has been fetched some other subsystem will inspect and do deeper validation of the collation. The subsystem will report to this subsystem with a [`CollatorProtocolMessage`][CPM]`::ReportCollator` or `NoteGoodCollation` message. In that case, if we are connected directly to the collator, we apply a cost to the `PeerId` associated with the collator and potentially disconnect or blacklist it.