approval-distribution: limit the amount of assignments on unify (#2737)

* approval-distribution: limit the amount of packets on unify

* guide: fix a typo

* compilation fix

* grammar

* Update roadmap/implementers-guide/src/node/approval/approval-distribution.md

Co-authored-by: David <dvdplm@gmail.com>

* more grammar

* propagate only local assignments/approvals after a certain depth

* increase the threshold

* guides update

Co-authored-by: David <dvdplm@gmail.com>
This commit is contained in:
Andronik Ordian
2021-03-28 23:30:06 +02:00
committed by GitHub
parent 171fc69961
commit 6f464a360f
2 changed files with 96 additions and 40 deletions
@@ -114,12 +114,20 @@ enum ApprovalState {
Approved(AssignmentCert, ValidatorSignature),
}
#[derive(Debug, Clone, Copy)]
enum LocalSource {
Yes,
No,
}
type BlockDepth = usize;
/// Information about candidates in the context of a particular block they are included in.
/// In other words, multiple `CandidateEntry`s may exist for the same candidate,
/// if it is included by multiple blocks - this is likely the case when there are forks.
#[derive(Debug, Default)]
struct CandidateEntry {
approvals: HashMap<ValidatorIndex, ApprovalState>,
approvals: HashMap<ValidatorIndex, (ApprovalState, LocalSource)>,
}
#[derive(Debug, Clone)]
@@ -135,6 +143,13 @@ impl MessageSource {
Self::Local => None,
}
}
fn as_local_source(&self) -> LocalSource {
match self {
Self::Local => LocalSource::Yes,
_ => LocalSource::No,
}
}
}
enum PendingMessage {
@@ -229,8 +244,6 @@ impl State {
);
{
let _timer = metrics.time_import_pending_now_known();
let pending_now_known = self.pending_known.keys()
.filter(|k| self.blocks.contains_key(k))
.copied()
@@ -241,24 +254,34 @@ impl State {
.flatten()
.collect::<Vec<_>>();
for (peer_id, message) in to_import {
match message {
PendingMessage::Assignment(assignment, claimed_index) => {
self.import_and_circulate_assignment(
ctx,
metrics,
MessageSource::Peer(peer_id),
assignment,
claimed_index,
).await;
}
PendingMessage::Approval(approval_vote) => {
self.import_and_circulate_approval(
ctx,
metrics,
MessageSource::Peer(peer_id),
approval_vote,
).await;
if !to_import.is_empty() {
tracing::debug!(
target: LOG_TARGET,
num = to_import.len(),
"Processing pending assignment/approvals",
);
let _timer = metrics.time_import_pending_now_known();
for (peer_id, message) in to_import {
match message {
PendingMessage::Assignment(assignment, claimed_index) => {
self.import_and_circulate_assignment(
ctx,
metrics,
MessageSource::Peer(peer_id),
assignment,
claimed_index,
).await;
}
PendingMessage::Approval(approval_vote) => {
self.import_and_circulate_approval(
ctx,
metrics,
MessageSource::Peer(peer_id),
approval_vote,
).await;
}
}
}
}
@@ -489,6 +512,7 @@ impl State {
tx,
))).await;
let timer = metrics.time_awaiting_approval_voting();
let result = match rx.await {
Ok(result) => result,
Err(_) => {
@@ -499,6 +523,7 @@ impl State {
return;
}
};
drop(timer);
tracing::trace!(
target: LOG_TARGET,
@@ -551,6 +576,8 @@ impl State {
}
}
let local_source = source.as_local_source();
// Invariant: none of the peers except for the `source` know about the assignment.
metrics.on_assignment_imported();
@@ -560,7 +587,7 @@ impl State {
// unless the approval state is set already
candidate_entry.approvals
.entry(validator_index)
.or_insert_with(|| ApprovalState::Assigned(assignment.cert.clone()));
.or_insert_with(|| (ApprovalState::Assigned(assignment.cert.clone()), local_source));
}
None => {
tracing::warn!(
@@ -596,10 +623,11 @@ impl State {
if !peers.is_empty() {
tracing::trace!(
target: LOG_TARGET,
"Sending assignment (block={}, index={}) to {} peers",
block_hash,
claimed_candidate_index,
peers.len(),
?block_hash,
?claimed_candidate_index,
?local_source,
num_peers = peers.len(),
"Sending an assignment to peers",
);
ctx.send_message(NetworkBridgeMessage::SendValidationMessage(
@@ -762,6 +790,8 @@ impl State {
}
}
let local_source = source.as_local_source();
// Invariant: none of the peers except for the `source` know about the approval.
metrics.on_approval_imported();
@@ -770,10 +800,10 @@ impl State {
// set the approval state for validator_index to Approved
// it should be in assigned state already
match candidate_entry.approvals.remove(&validator_index) {
Some(ApprovalState::Assigned(cert)) => {
Some((ApprovalState::Assigned(cert), _local)) => {
candidate_entry.approvals.insert(
validator_index,
ApprovalState::Approved(cert, vote.signature.clone()),
(ApprovalState::Approved(cert, vote.signature.clone()), local_source),
);
}
_ => {
@@ -819,10 +849,11 @@ impl State {
if !peers.is_empty() {
tracing::trace!(
target: LOG_TARGET,
"Sending approval (block={}, index={}) to {} peers",
block_hash,
candidate_index,
peers.len(),
?block_hash,
?candidate_index,
?local_source,
num_peers = peers.len(),
"Sending an approval to peers",
);
ctx.send_message(NetworkBridgeMessage::SendValidationMessage(
@@ -843,7 +874,7 @@ impl State {
) {
metrics.on_unify_with_peer();
let _timer = metrics.time_unify_with_peer();
let mut to_send = HashSet::new();
let mut to_send: Vec<(BlockDepth, Hash)> = Vec::new();
let view_finalized_number = view.finalized_number;
for head in view.into_iter() {
@@ -867,7 +898,7 @@ impl State {
block = entry.parent_hash.clone();
Some(interesting_block)
});
to_send.extend(interesting_blocks);
to_send.extend(interesting_blocks.enumerate());
}
// step 6.
// send all assignments and approvals for all candidates in those blocks to the peer
@@ -883,12 +914,16 @@ impl State {
entries: &HashMap<Hash, BlockEntry>,
ctx: &mut impl SubsystemContext<Message = ApprovalDistributionMessage>,
peer_id: PeerId,
blocks: HashSet<Hash>,
blocks: Vec<(BlockDepth, Hash)>,
) {
// we will only propagate local assignment/approvals after a certain depth
const DEPTH_THRESHOLD: usize = 5;
let mut assignments = Vec::new();
let mut approvals = Vec::new();
let num_blocks = blocks.len();
for block in blocks.into_iter() {
for (depth, block) in blocks.into_iter() {
let entry = match entries.get(&block) {
Some(entry) => entry,
None => continue, // should be unreachable
@@ -903,7 +938,10 @@ impl State {
for (candidate_index, candidate_entry) in entry.candidates.iter().enumerate() {
let candidate_index = candidate_index as u32;
for (validator_index, approval_state) in candidate_entry.approvals.iter() {
for (validator_index, (approval_state, is_local)) in candidate_entry.approvals.iter() {
if depth >= DEPTH_THRESHOLD && !matches!(is_local, LocalSource::Yes) {
continue;
}
match approval_state {
ApprovalState::Assigned(cert) => {
assignments.push((IndirectAssignmentCert {
@@ -926,6 +964,14 @@ impl State {
}
if !assignments.is_empty() {
tracing::trace!(
target: LOG_TARGET,
num = assignments.len(),
?num_blocks,
?peer_id,
"Sending assignments to a peer",
);
ctx.send_message(NetworkBridgeMessage::SendValidationMessage(
vec![peer_id.clone()],
protocol_v1::ValidationProtocol::ApprovalDistribution(
@@ -935,6 +981,14 @@ impl State {
}
if !approvals.is_empty() {
tracing::trace!(
target: LOG_TARGET,
num = approvals.len(),
?num_blocks,
?peer_id,
"Sending approvals to a peer",
);
ctx.send_message(NetworkBridgeMessage::SendValidationMessage(
vec![peer_id],
protocol_v1::ValidationProtocol::ApprovalDistribution(