Code, PoV compression and remove CompressedPoV struct (#2852)

* use compressed blob in candidate-validation

* add some tests for compressed code blobs

* remove CompressedPoV and apply compression in collation-generation

* decompress BlockData before executing

* don't produce oversized collations

* add test for PoV decompression failure

* fix tests and clean up

* fix test

* address review and fix CI

* take this )
This commit is contained in:
Robert Habermeier
2021-04-08 22:09:36 +02:00
committed by GitHub
parent bb48c47fbf
commit 896ec8dbc3
16 changed files with 489 additions and 380 deletions
@@ -24,7 +24,6 @@ use futures::channel::oneshot;
use polkadot_node_subsystem_util::Error as UtilError;
use polkadot_primitives::v1::SessionIndex;
use polkadot_node_primitives::CompressedPoVError;
use polkadot_subsystem::{errors::RuntimeApiError, SubsystemError};
use crate::LOG_TARGET;
@@ -79,10 +78,6 @@ pub enum Error {
#[error("There was no session with the given index")]
NoSuchSession(SessionIndex),
/// Decompressing PoV failed.
#[error("PoV could not be decompressed")]
PoVDecompression(CompressedPoVError),
/// Fetching PoV failed with `RequestError`.
#[error("FetchPoV request error")]
FetchPoV(#[source] RequestError),
@@ -152,9 +152,7 @@ async fn do_fetch_pov(
{
let response = pending_response.await.map_err(Error::FetchPoV)?;
let pov = match response {
PoVFetchingResponse::PoV(compressed) => {
compressed.decompress().map_err(Error::PoVDecompression)?
}
PoVFetchingResponse::PoV(pov) => pov,
PoVFetchingResponse::NoSuchPoV => {
return Err(Error::NoSuchPoV)
}
@@ -244,7 +242,7 @@ mod tests {
use sp_core::testing::TaskExecutor;
use polkadot_primitives::v1::{CandidateHash, Hash, ValidatorIndex};
use polkadot_node_primitives::{BlockData, CompressedPoV};
use polkadot_node_primitives::BlockData;
use polkadot_subsystem_testhelpers as test_helpers;
use polkadot_subsystem::messages::{AvailabilityDistributionMessage, RuntimeApiMessage, RuntimeApiRequest};
@@ -315,9 +313,8 @@ mod tests {
reqs.pop(),
Some(Requests::PoVFetching(outgoing)) => {outgoing}
);
req.pending_response.send(Ok(PoVFetchingResponse::PoV(
CompressedPoV::compress(&pov).unwrap()).encode()
)).unwrap();
req.pending_response.send(Ok(PoVFetchingResponse::PoV(pov.clone()).encode()))
.unwrap();
break
},
msg => tracing::debug!(target: LOG_TARGET, msg = ?msg, "Received msg"),
@@ -16,11 +16,13 @@
//! Answer requests for availability chunks.
use std::sync::Arc;
use futures::channel::oneshot;
use polkadot_node_network_protocol::request_response::{request::IncomingRequest, v1};
use polkadot_primitives::v1::{CandidateHash, ValidatorIndex};
use polkadot_node_primitives::{AvailableData, CompressedPoV, ErasureChunk};
use polkadot_node_primitives::{AvailableData, ErasureChunk};
use polkadot_subsystem::{
messages::{AllMessages, AvailabilityStoreMessage},
SubsystemContext, jaeger,
@@ -100,18 +102,7 @@ where
let response = match av_data {
None => v1::PoVFetchingResponse::NoSuchPoV,
Some(av_data) => {
let pov = match CompressedPoV::compress(&av_data.pov) {
Ok(pov) => pov,
Err(error) => {
tracing::error!(
target: LOG_TARGET,
error = ?error,
"Failed to create `CompressedPov`",
);
// this should really not happen, let this request time out:
return Err(Error::PoVDecompression(error))
}
};
let pov = Arc::try_unwrap(av_data.pov).unwrap_or_else(|a| (&*a).clone());
v1::PoVFetchingResponse::PoV(pov)
}
};
@@ -42,7 +42,7 @@ use polkadot_node_subsystem_util::{
request_availability_cores,
metrics::{self, prometheus},
};
use polkadot_node_primitives::{SignedFullStatement, Statement, PoV, CompressedPoV};
use polkadot_node_primitives::{SignedFullStatement, Statement, PoV};
const COST_UNEXPECTED_MESSAGE: Rep = Rep::CostMinor("An unexpected message");
@@ -660,27 +660,6 @@ async fn send_collation(
receipt: CandidateReceipt,
pov: PoV,
) {
let pov = match CompressedPoV::compress(&pov) {
Ok(compressed) => {
tracing::trace!(
target: LOG_TARGET,
size = %pov.block_data.0.len(),
compressed = %compressed.len(),
peer_id = ?request.peer,
"Sending collation."
);
compressed
},
Err(error) => {
tracing::error!(
target: LOG_TARGET,
?error,
"Failed to create `CompressedPov`",
);
return
}
};
if let Err(_) = request.send_response(CollationFetchingResponse::Collation(receipt, pov)) {
tracing::warn!(
target: LOG_TARGET,
@@ -1519,7 +1498,7 @@ mod tests {
)
.expect("Decoding should work");
assert_eq!(receipt, candidate);
assert_eq!(pov.decompress().unwrap(), pov_block);
assert_eq!(pov, pov_block);
}
);
@@ -1158,49 +1158,33 @@ where
modify_reputation(ctx, *peer_id, COST_WRONG_PARA).await;
}
Ok(CollationFetchingResponse::Collation(receipt, compressed_pov)) => {
match compressed_pov.decompress() {
Ok(pov) => {
tracing::debug!(
target: LOG_TARGET,
para_id = %para_id,
hash = ?hash,
candidate_hash = ?receipt.hash(),
"Received collation",
);
Ok(CollationFetchingResponse::Collation(receipt, pov)) => {
tracing::debug!(
target: LOG_TARGET,
para_id = %para_id,
hash = ?hash,
candidate_hash = ?receipt.hash(),
"Received collation",
);
// Actual sending:
let _span = jaeger::Span::new(&pov, "received-collation");
let (mut tx, _) = oneshot::channel();
std::mem::swap(&mut tx, &mut (per_req.to_requester));
let result = tx.send((receipt, pov));
// Actual sending:
let _span = jaeger::Span::new(&pov, "received-collation");
let (mut tx, _) = oneshot::channel();
std::mem::swap(&mut tx, &mut (per_req.to_requester));
let result = tx.send((receipt, pov));
if let Err(_) = result {
tracing::warn!(
target: LOG_TARGET,
hash = ?hash,
para_id = ?para_id,
peer_id = ?peer_id,
"Sending response back to requester failed (receiving side closed)"
);
} else {
metrics_result = Ok(());
success = "true";
}
}
Err(error) => {
tracing::warn!(
target: LOG_TARGET,
hash = ?hash,
para_id = ?para_id,
peer_id = ?peer_id,
?error,
"Failed to extract PoV",
);
modify_reputation(ctx, *peer_id, COST_CORRUPTED_MESSAGE).await;
}
};
if let Err(_) = result {
tracing::warn!(
target: LOG_TARGET,
hash = ?hash,
para_id = ?para_id,
peer_id = ?peer_id,
"Sending response back to requester failed (receiving side closed)"
);
} else {
metrics_result = Ok(());
success = "true";
}
}
};
metrics.on_request(metrics_result);
@@ -1227,7 +1211,7 @@ mod tests {
CollatorPair, ValidatorId, ValidatorIndex, CoreState, CandidateDescriptor,
GroupRotationInfo, ScheduledCore, OccupiedCore, GroupIndex,
};
use polkadot_node_primitives::{BlockData, CompressedPoV};
use polkadot_node_primitives::BlockData;
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_subsystem_testhelpers as test_helpers;
use polkadot_subsystem::messages::{RuntimeApiMessage, RuntimeApiRequest};
@@ -1859,9 +1843,9 @@ mod tests {
response_channel.send(Ok(
CollationFetchingResponse::Collation(
candidate_a.clone(),
CompressedPoV::compress(&PoV {
PoV {
block_data: BlockData(vec![]),
}).unwrap(),
},
).encode()
)).expect("Sending response should succeed");
@@ -1889,9 +1873,9 @@ mod tests {
response_channel.send(Ok(
CollationFetchingResponse::Collation(
candidate_b.clone(),
CompressedPoV::compress(&PoV {
PoV {
block_data: BlockData(vec![1, 2, 3]),
}).unwrap(),
},
).encode()
)).expect("Sending response should succeed");
@@ -36,7 +36,7 @@ use std::borrow::Cow;
use std::time::Duration;
use futures::channel::mpsc;
use polkadot_node_primitives::MAX_COMPRESSED_POV_SIZE;
use polkadot_node_primitives::MAX_POV_SIZE;
use strum::EnumIter;
pub use sc_network::config as network;
@@ -84,7 +84,7 @@ const MIN_BANDWIDTH_BYTES: u64 = 50 * 1024 * 1024;
/// Timeout for PoV like data, 2 times what it should take, assuming we can fully utilize the
/// bandwidth. This amounts to two seconds right now.
const POV_REQUEST_TIMEOUT_CONNECTED: Duration =
Duration::from_millis(2 * 1000 * (MAX_COMPRESSED_POV_SIZE as u64) / MIN_BANDWIDTH_BYTES);
Duration::from_millis(2 * 1000 * (MAX_POV_SIZE as u64) / MIN_BANDWIDTH_BYTES);
impl Protocol {
/// Get a configuration for a given Request response protocol.
@@ -114,7 +114,7 @@ impl Protocol {
Protocol::CollationFetching => RequestResponseConfig {
name: p_name,
max_request_size: 10_000,
max_response_size: MAX_COMPRESSED_POV_SIZE as u64,
max_response_size: MAX_POV_SIZE as u64,
// Taken from initial implementation in collator protocol:
request_timeout: POV_REQUEST_TIMEOUT_CONNECTED,
inbound_queue: Some(tx),
@@ -122,7 +122,7 @@ impl Protocol {
Protocol::PoVFetching => RequestResponseConfig {
name: p_name,
max_request_size: 1_000,
max_response_size: MAX_COMPRESSED_POV_SIZE as u64,
max_response_size: MAX_POV_SIZE as u64,
request_timeout: POV_REQUEST_TIMEOUT_CONNECTED,
inbound_queue: Some(tx),
},
@@ -130,7 +130,7 @@ impl Protocol {
name: p_name,
max_request_size: 1_000,
// Available data size is dominated by the PoV size.
max_response_size: MAX_COMPRESSED_POV_SIZE as u64,
max_response_size: MAX_POV_SIZE as u64,
request_timeout: POV_REQUEST_TIMEOUT_CONNECTED,
inbound_queue: Some(tx),
},
@@ -23,7 +23,7 @@ use polkadot_primitives::v1::{
Hash,
};
use polkadot_primitives::v1::Id as ParaId;
use polkadot_node_primitives::{AvailableData, CompressedPoV, ErasureChunk};
use polkadot_node_primitives::{AvailableData, PoV, ErasureChunk};
use super::request::IsRequest;
use super::Protocol;
@@ -107,7 +107,7 @@ pub struct CollationFetchingRequest {
pub enum CollationFetchingResponse {
/// Deliver requested collation.
#[codec(index = 0)]
Collation(CandidateReceipt, CompressedPoV),
Collation(CandidateReceipt, PoV),
}
impl IsRequest for CollationFetchingRequest {
@@ -127,7 +127,7 @@ pub struct PoVFetchingRequest {
pub enum PoVFetchingResponse {
/// Deliver requested PoV.
#[codec(index = 0)]
PoV(CompressedPoV),
PoV(PoV),
/// PoV was not found in store.
#[codec(index = 1)]
NoSuchPoV,