approval-distribution: batched approval/assignment sending (#6401)

* Imple batched send

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* Add batch tests

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* pub MAX_NOTIFICATION_SIZE

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* spell check

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* spellcheck ...

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* Fix comment

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* o.O

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* 2 constants

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* Ensure batch size is at least 1 element

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* feedback impl

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>
This commit is contained in:
Andrei Sandu
2022-12-13 14:06:05 +02:00
committed by GitHub
parent da0b0224c5
commit 7f020eb527
3 changed files with 222 additions and 31 deletions
@@ -24,6 +24,7 @@ use futures::{channel::oneshot, FutureExt as _};
use polkadot_node_network_protocol::{
self as net_protocol,
grid_topology::{RandomRouting, RequiredRouting, SessionGridTopologies, SessionGridTopology},
peer_set::MAX_NOTIFICATION_SIZE,
v1 as protocol_v1, PeerId, UnifiedReputationChange as Rep, Versioned, View,
};
use polkadot_node_primitives::approval::{
@@ -1381,14 +1382,7 @@ impl State {
"Sending assignments to unified peer",
);
sender
.send_message(NetworkBridgeTxMessage::SendValidationMessage(
vec![peer_id],
Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution(
protocol_v1::ApprovalDistributionMessage::Assignments(assignments_to_send),
)),
))
.await;
send_assignments_batched(sender, assignments_to_send, peer_id).await;
}
if !approvals_to_send.is_empty() {
@@ -1399,14 +1393,7 @@ impl State {
"Sending approvals to unified peer",
);
sender
.send_message(NetworkBridgeTxMessage::SendValidationMessage(
vec![peer_id],
Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution(
protocol_v1::ApprovalDistributionMessage::Approvals(approvals_to_send),
)),
))
.await;
send_approvals_batched(sender, approvals_to_send, peer_id).await;
}
}
@@ -1605,23 +1592,11 @@ async fn adjust_required_routing_and_propagate<Context, BlockFilter, RoutingModi
// Send messages in accumulated packets, assignments preceding approvals.
for (peer, assignments_packet) in peer_assignments {
ctx.send_message(NetworkBridgeTxMessage::SendValidationMessage(
vec![peer],
Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution(
protocol_v1::ApprovalDistributionMessage::Assignments(assignments_packet),
)),
))
.await;
send_assignments_batched(ctx.sender(), assignments_packet, peer).await;
}
for (peer, approvals_packet) in peer_approvals {
ctx.send_message(NetworkBridgeTxMessage::SendValidationMessage(
vec![peer],
Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution(
protocol_v1::ApprovalDistributionMessage::Approvals(approvals_packet),
)),
))
.await;
send_approvals_batched(ctx.sender(), approvals_packet, peer).await;
}
}
@@ -1758,3 +1733,75 @@ impl<Context> ApprovalDistribution {
SpawnedSubsystem { name: "approval-distribution-subsystem", future }
}
}
/// Ensures the batch size is always at least 1 element.
const fn ensure_size_not_zero(size: usize) -> usize {
if 0 == size {
panic!("Batch size must be at least 1 (MAX_NOTIFICATION_SIZE constant is too low)",);
}
size
}
/// The maximum amount of assignments per batch is 33% of maximum allowed by protocol.
/// This is an arbitrary value. Bumping this up increases the maximum amount of approvals or assignments
/// we send in a single message to peers. Exceeding `MAX_NOTIFICATION_SIZE` will violate the protocol
/// configuration.
pub const MAX_ASSIGNMENT_BATCH_SIZE: usize = ensure_size_not_zero(
MAX_NOTIFICATION_SIZE as usize /
std::mem::size_of::<(IndirectAssignmentCert, CandidateIndex)>() /
3,
);
/// The maximum amount of approvals per batch is 33% of maximum allowed by protocol.
pub const MAX_APPROVAL_BATCH_SIZE: usize = ensure_size_not_zero(
MAX_NOTIFICATION_SIZE as usize / std::mem::size_of::<IndirectSignedApprovalVote>() / 3,
);
/// Send assignments while honoring the `max_notification_size` of the protocol.
///
/// Splitting the messages into multiple notifications allows more granular processing at the
/// destination, such that the subsystem doesn't get stuck for long processing a batch
/// of assignments and can `select!` other tasks.
pub(crate) async fn send_assignments_batched(
sender: &mut impl overseer::ApprovalDistributionSenderTrait,
assignments: Vec<(IndirectAssignmentCert, CandidateIndex)>,
peer: PeerId,
) {
let mut batches = assignments.into_iter().peekable();
while batches.peek().is_some() {
let batch: Vec<_> = batches.by_ref().take(MAX_ASSIGNMENT_BATCH_SIZE).collect();
sender
.send_message(NetworkBridgeTxMessage::SendValidationMessage(
vec![peer],
Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution(
protocol_v1::ApprovalDistributionMessage::Assignments(batch),
)),
))
.await;
}
}
/// Send approvals while honoring the `max_notification_size` of the protocol.
pub(crate) async fn send_approvals_batched(
sender: &mut impl overseer::ApprovalDistributionSenderTrait,
approvals: Vec<IndirectSignedApprovalVote>,
peer: PeerId,
) {
let mut batches = approvals.into_iter().peekable();
while batches.peek().is_some() {
let batch: Vec<_> = batches.by_ref().take(MAX_APPROVAL_BATCH_SIZE).collect();
sender
.send_message(NetworkBridgeTxMessage::SendValidationMessage(
vec![peer],
Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution(
protocol_v1::ApprovalDistributionMessage::Approvals(batch),
)),
))
.await;
}
}