mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-30 10:31:03 +00:00
Compress the PoV block before sending it over the network (#2288)
* Compress the PoV block before sending it over the network This pr changes the way we send PoV blocks over the network. We now compress the PoV block before it is send over the network. This should reduce the size significant for PoVs which contain the runtime WASM for example. * Preallocate 1KB * Try something.. * Switch to zstd and some renamings * Make compression/decompression fail in browsers * Use some sane maximum value * Update roadmap/implementers-guide/src/types/network.md Co-authored-by: Andronik Ordian <write@reusable.software> * Fix and add test * add Co-authored-by: Andronik Ordian <write@reusable.software> Co-authored-by: Robert Habermeier <rphmeier@gmail.com>
This commit is contained in:
@@ -106,7 +106,7 @@ struct State {
|
||||
}
|
||||
|
||||
struct BlockBasedState {
|
||||
known: HashMap<Hash, Arc<PoV>>,
|
||||
known: HashMap<Hash, (Arc<PoV>, protocol_v1::CompressedPoV)>,
|
||||
|
||||
/// All the PoVs we are or were fetching, coupled with channels expecting the data.
|
||||
///
|
||||
@@ -131,11 +131,13 @@ fn awaiting_message(relay_parent: Hash, awaiting: Vec<Hash>)
|
||||
)
|
||||
}
|
||||
|
||||
fn send_pov_message(relay_parent: Hash, pov_hash: Hash, pov: PoV)
|
||||
-> protocol_v1::ValidationProtocol
|
||||
{
|
||||
fn send_pov_message(
|
||||
relay_parent: Hash,
|
||||
pov_hash: Hash,
|
||||
pov: &protocol_v1::CompressedPoV,
|
||||
) -> protocol_v1::ValidationProtocol {
|
||||
protocol_v1::ValidationProtocol::PoVDistribution(
|
||||
protocol_v1::PoVDistributionMessage::SendPoV(relay_parent, pov_hash, pov)
|
||||
protocol_v1::PoVDistributionMessage::SendPoV(relay_parent, pov_hash, pov.clone())
|
||||
)
|
||||
}
|
||||
|
||||
@@ -267,7 +269,7 @@ async fn distribute_to_awaiting(
|
||||
metrics: &Metrics,
|
||||
relay_parent: Hash,
|
||||
pov_hash: Hash,
|
||||
pov: &PoV,
|
||||
pov: &protocol_v1::CompressedPoV,
|
||||
) {
|
||||
// Send to all peers who are awaiting the PoV and have that relay-parent in their view.
|
||||
//
|
||||
@@ -284,7 +286,7 @@ async fn distribute_to_awaiting(
|
||||
|
||||
if peers_to_send.is_empty() { return; }
|
||||
|
||||
let payload = send_pov_message(relay_parent, pov_hash, pov.clone());
|
||||
let payload = send_pov_message(relay_parent, pov_hash, pov);
|
||||
|
||||
ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
|
||||
peers_to_send,
|
||||
@@ -379,7 +381,7 @@ async fn handle_fetch(
|
||||
None => return,
|
||||
};
|
||||
|
||||
if let Some(pov) = relay_parent_state.known.get(&descriptor.pov_hash) {
|
||||
if let Some((pov, _)) = relay_parent_state.known.get(&descriptor.pov_hash) {
|
||||
let _ = response_sender.send(pov.clone());
|
||||
return;
|
||||
}
|
||||
@@ -468,7 +470,17 @@ async fn handle_distribute(
|
||||
}
|
||||
}
|
||||
|
||||
relay_parent_state.known.insert(descriptor.pov_hash, pov.clone());
|
||||
let encoded_pov = match protocol_v1::CompressedPoV::compress(&*pov) {
|
||||
Ok(pov) => pov,
|
||||
Err(error) => {
|
||||
tracing::debug!(
|
||||
target: LOG_TARGET,
|
||||
error = ?error,
|
||||
"Failed to create `CompressedPov`."
|
||||
);
|
||||
return
|
||||
}
|
||||
};
|
||||
|
||||
distribute_to_awaiting(
|
||||
&mut state.peer_state,
|
||||
@@ -476,8 +488,10 @@ async fn handle_distribute(
|
||||
&state.metrics,
|
||||
relay_parent,
|
||||
descriptor.pov_hash,
|
||||
&*pov,
|
||||
).await
|
||||
&encoded_pov,
|
||||
).await;
|
||||
|
||||
relay_parent_state.known.insert(descriptor.pov_hash, (pov, encoded_pov));
|
||||
}
|
||||
|
||||
/// Report a reputation change for a peer.
|
||||
@@ -527,8 +541,9 @@ async fn handle_awaiting(
|
||||
for pov_hash in pov_hashes {
|
||||
// For all requested PoV hashes, if we have it, we complete the request immediately.
|
||||
// Otherwise, we note that the peer is awaiting the PoV.
|
||||
if let Some(pov) = relay_parent_state.known.get(&pov_hash) {
|
||||
let payload = send_pov_message(relay_parent, pov_hash, (&**pov).clone());
|
||||
if let Some((_, ref pov)) = relay_parent_state.known.get(&pov_hash) {
|
||||
let payload = send_pov_message(relay_parent, pov_hash, pov);
|
||||
|
||||
ctx.send_message(AllMessages::NetworkBridge(
|
||||
NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload)
|
||||
)).await;
|
||||
@@ -544,23 +559,35 @@ async fn handle_awaiting(
|
||||
/// Handle an incoming PoV from our peer. Reports them if unexpected, rewards them if not.
|
||||
///
|
||||
/// Completes any requests awaiting that PoV.
|
||||
#[tracing::instrument(level = "trace", skip(ctx, state, pov), fields(subsystem = LOG_TARGET))]
|
||||
#[tracing::instrument(level = "trace", skip(ctx, state, encoded_pov), fields(subsystem = LOG_TARGET))]
|
||||
async fn handle_incoming_pov(
|
||||
state: &mut State,
|
||||
ctx: &mut impl SubsystemContext<Message = PoVDistributionMessage>,
|
||||
peer: PeerId,
|
||||
relay_parent: Hash,
|
||||
pov_hash: Hash,
|
||||
pov: PoV,
|
||||
encoded_pov: protocol_v1::CompressedPoV,
|
||||
) {
|
||||
let relay_parent_state = match state.relay_parent_state.get_mut(&relay_parent) {
|
||||
None => {
|
||||
None => {
|
||||
report_peer(ctx, peer, COST_UNEXPECTED_POV).await;
|
||||
return;
|
||||
},
|
||||
Some(r) => r,
|
||||
};
|
||||
|
||||
let pov = match encoded_pov.decompress() {
|
||||
Ok(pov) => pov,
|
||||
Err(error) => {
|
||||
tracing::debug!(
|
||||
target: LOG_TARGET,
|
||||
error = ?error,
|
||||
"Could not extract PoV",
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let pov = {
|
||||
// Do validity checks and complete all senders awaiting this PoV.
|
||||
let fetching = match relay_parent_state.fetching.get_mut(&pov_hash) {
|
||||
@@ -607,8 +634,10 @@ async fn handle_incoming_pov(
|
||||
&state.metrics,
|
||||
relay_parent,
|
||||
pov_hash,
|
||||
&*pov,
|
||||
).await
|
||||
&encoded_pov,
|
||||
).await;
|
||||
|
||||
relay_parent_state.known.insert(pov_hash, (pov, encoded_pov));
|
||||
}
|
||||
|
||||
/// Handles a newly connected validator in the context of some relay leaf.
|
||||
|
||||
@@ -396,7 +396,11 @@ fn ask_validators_for_povs() {
|
||||
PoVDistributionMessage::NetworkBridgeUpdateV1(
|
||||
NetworkBridgeEvent::PeerMessage(
|
||||
test_state.validator_peer_id[2].clone(),
|
||||
protocol_v1::PoVDistributionMessage::SendPoV(current, pov_hash, pov_block.clone()),
|
||||
protocol_v1::PoVDistributionMessage::SendPoV(
|
||||
current,
|
||||
pov_hash,
|
||||
protocol_v1::CompressedPoV::compress(&pov_block).unwrap(),
|
||||
),
|
||||
)
|
||||
)
|
||||
).await;
|
||||
@@ -631,7 +635,7 @@ fn distributes_to_those_awaiting_and_completes_local() {
|
||||
assert_eq!(peers, vec![peer_a.clone()]);
|
||||
assert_eq!(
|
||||
message,
|
||||
send_pov_message(hash_a, pov_hash, pov.clone()),
|
||||
send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()),
|
||||
);
|
||||
}
|
||||
)
|
||||
@@ -943,7 +947,7 @@ fn peer_complete_fetch_and_is_rewarded() {
|
||||
&mut ctx,
|
||||
NetworkBridgeEvent::PeerMessage(
|
||||
peer_a.clone(),
|
||||
send_pov_message(hash_a, pov_hash, pov.clone()),
|
||||
send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()),
|
||||
).focus().unwrap(),
|
||||
).await;
|
||||
|
||||
@@ -952,7 +956,7 @@ fn peer_complete_fetch_and_is_rewarded() {
|
||||
&mut ctx,
|
||||
NetworkBridgeEvent::PeerMessage(
|
||||
peer_b.clone(),
|
||||
send_pov_message(hash_a, pov_hash, pov.clone()),
|
||||
send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()),
|
||||
).focus().unwrap(),
|
||||
).await;
|
||||
|
||||
@@ -1033,7 +1037,7 @@ fn peer_punished_for_sending_bad_pov() {
|
||||
&mut ctx,
|
||||
NetworkBridgeEvent::PeerMessage(
|
||||
peer_a.clone(),
|
||||
send_pov_message(hash_a, pov_hash, bad_pov.clone()),
|
||||
send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&bad_pov).unwrap()),
|
||||
).focus().unwrap(),
|
||||
).await;
|
||||
|
||||
@@ -1098,7 +1102,7 @@ fn peer_punished_for_sending_unexpected_pov() {
|
||||
&mut ctx,
|
||||
NetworkBridgeEvent::PeerMessage(
|
||||
peer_a.clone(),
|
||||
send_pov_message(hash_a, pov_hash, pov.clone()),
|
||||
send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()),
|
||||
).focus().unwrap(),
|
||||
).await;
|
||||
|
||||
@@ -1161,7 +1165,7 @@ fn peer_punished_for_sending_pov_out_of_our_view() {
|
||||
&mut ctx,
|
||||
NetworkBridgeEvent::PeerMessage(
|
||||
peer_a.clone(),
|
||||
send_pov_message(hash_b, pov_hash, pov.clone()),
|
||||
send_pov_message(hash_b, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()),
|
||||
).focus().unwrap(),
|
||||
).await;
|
||||
|
||||
@@ -1450,7 +1454,7 @@ fn peer_complete_fetch_leads_to_us_completing_others() {
|
||||
&mut ctx,
|
||||
NetworkBridgeEvent::PeerMessage(
|
||||
peer_a.clone(),
|
||||
send_pov_message(hash_a, pov_hash, pov.clone()),
|
||||
send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()),
|
||||
).focus().unwrap(),
|
||||
).await;
|
||||
|
||||
@@ -1474,7 +1478,7 @@ fn peer_complete_fetch_leads_to_us_completing_others() {
|
||||
assert_eq!(peers, vec![peer_b.clone()]);
|
||||
assert_eq!(
|
||||
message,
|
||||
send_pov_message(hash_a, pov_hash, pov.clone()),
|
||||
send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()),
|
||||
);
|
||||
}
|
||||
);
|
||||
@@ -1534,7 +1538,7 @@ fn peer_completing_request_no_longer_awaiting() {
|
||||
&mut ctx,
|
||||
NetworkBridgeEvent::PeerMessage(
|
||||
peer_a.clone(),
|
||||
send_pov_message(hash_a, pov_hash, pov.clone()),
|
||||
send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()),
|
||||
).focus().unwrap(),
|
||||
).await;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user