Batch messages to network bridge and introduce a timeout to SubsystemContext::send_message (#2197)

* guide: batch network messages

* bridge: batch

* av-dist: batch outgoing messages

* time-out message sends in subsystem context

* Update node/subsystem/src/messages.rs

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>

* Revert "time-out message sends in subsystem context"

This reverts commit d49be62557ec37c8a350b93718acad723df704ef.

* Update node/network/availability-distribution/src/lib.rs

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
This commit is contained in:
Robert Habermeier
2021-01-07 16:01:03 -05:00
committed by GitHub
parent b97f52a4c8
commit 9dee08af2f
6 changed files with 111 additions and 54 deletions
@@ -394,6 +394,7 @@ where
}
// handle all candidates
let mut messages_out = Vec::new();
for candidate_hash in state.cached_live_candidates_unioned(view.difference(&old_view)) {
// If we are not a validator for this candidate, let's skip it.
match state.per_candidate.get(&candidate_hash) {
@@ -475,13 +476,16 @@ where
.cloned()
.collect::<Vec<_>>();
send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, peers, iter::once(message)).await;
add_tracked_messages_to_batch(&mut messages_out, per_candidate, metrics, peers, iter::once(message));
}
// traces are better if we wait until the loop is done to drop.
per_candidate.drop_span_after_own_availability();
}
// send all batched messages out.
send_batch_to_network(ctx, messages_out).await;
// cleanup the removed relay parents and their states
old_view.difference(&view).for_each(|r| state.remove_relay_parent(r));
state.clean_up_live_under_cache();
@@ -489,17 +493,15 @@ where
Ok(())
}
#[tracing::instrument(level = "trace", skip(ctx, metrics, message_iter), fields(subsystem = LOG_TARGET))]
async fn send_tracked_gossip_messages_to_peers<Context>(
ctx: &mut Context,
// After this function is invoked, the state reflects the messages as having been sent to a peer.
#[tracing::instrument(level = "trace", skip(batch, metrics, message_iter), fields(subsystem = LOG_TARGET))]
fn add_tracked_messages_to_batch(
batch: &mut Vec<(Vec<PeerId>, protocol_v1::ValidationProtocol)>,
per_candidate: &mut PerCandidate,
metrics: &Metrics,
peers: Vec<PeerId>,
message_iter: impl IntoIterator<Item = AvailabilityGossipMessage>,
)
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
) {
for message in message_iter {
for peer in peers.iter() {
per_candidate
@@ -510,16 +512,25 @@ where
}
if !peers.is_empty() {
ctx.send_message(NetworkBridgeMessage::SendValidationMessage(
batch.push((
peers.clone(),
protocol_v1::ValidationProtocol::AvailabilityDistribution(message.into()),
).into()).await;
));
metrics.on_chunk_distributed();
}
}
}
async fn send_batch_to_network(
ctx: &mut impl SubsystemContext,
batch: Vec<(Vec<PeerId>, protocol_v1::ValidationProtocol)>,
) {
if !batch.is_empty() {
ctx.send_message(NetworkBridgeMessage::SendValidationMessages(batch).into()).await
}
}
// Send the difference between two views which were not sent
// to that particular peer.
#[tracing::instrument(level = "trace", skip(ctx, metrics), fields(subsystem = LOG_TARGET))]
@@ -544,6 +555,7 @@ where
let added_candidates = state.cached_live_candidates_unioned(added.iter());
// Send all messages we've seen before and the peer is now interested in.
let mut batch = Vec::new();
for candidate_hash in added_candidates {
let per_candidate = match state.per_candidate.get_mut(&candidate_hash) {
Some(p) => p,
@@ -564,8 +576,10 @@ where
.cloned()
.collect::<HashSet<_>>();
send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, vec![origin.clone()], messages).await;
add_tracked_messages_to_batch(&mut batch, per_candidate, metrics, vec![origin.clone()], messages);
}
send_batch_to_network(ctx, batch).await;
}
/// Obtain the first key which has a signing key.
@@ -753,7 +767,9 @@ where
drop(span);
// gossip that message to interested peers
send_tracked_gossip_messages_to_peers(ctx, candidate_entry, metrics, peers, iter::once(message)).await;
let mut batch = Vec::new();
add_tracked_messages_to_batch(&mut batch, candidate_entry, metrics, peers, iter::once(message));
send_batch_to_network(ctx, batch).await;
Ok(())
}
@@ -372,28 +372,32 @@ fn derive_erasure_chunks_with_proofs(
async fn expect_chunks_network_message(
virtual_overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityDistributionMessage>,
peers: &[PeerId],
peers: &[Vec<PeerId>],
candidates: &[CandidateHash],
chunks: &[ErasureChunk],
) {
for _ in 0..chunks.len() {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(
send_peers,
if chunks.is_empty() { return }
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessages(msgs)
) => {
assert_eq!(msgs.len(), chunks.len());
for (send_peers, msg) in msgs {
assert_matches!(
msg,
protocol_v1::ValidationProtocol::AvailabilityDistribution(
protocol_v1::AvailabilityDistributionMessage::Chunk(send_candidate, send_chunk),
),
)
) => {
assert!(candidates.contains(&send_candidate), format!("Could not find candidate: {:?}", send_candidate));
assert!(chunks.iter().any(|c| c == &send_chunk), format!("Could not find chunk: {:?}", send_chunk));
assert_eq!(peers.len(), send_peers.len());
assert!(peers.iter().all(|p| send_peers.contains(p)));
protocol_v1::AvailabilityDistributionMessage::Chunk(send_candidate, send_chunk)
) => {
let i = chunks.iter().position(|c| c == &send_chunk).unwrap();
assert!(candidates.contains(&send_candidate), format!("Could not find candidate: {:?}", send_candidate));
assert_eq!(&peers[i], &send_peers);
}
);
}
);
}
}
)
}
async fn change_our_view(
@@ -464,6 +468,9 @@ async fn change_our_view(
);
}
let mut send_peers = Vec::new();
let mut send_chunks = Vec::new();
let mut candidates = Vec::new();
for _ in 0..data_availability.len() {
let (available, candidate_hash) = assert_matches!(
overseer_recv(virtual_overseer).await,
@@ -485,6 +492,7 @@ async fn change_our_view(
continue;
}
candidates.push(candidate_hash);
if let Some((pov, persisted)) = chunk_data_per_candidate.get(&candidate_hash) {
let chunks = make_erasure_chunks(persisted.clone(), validator_public.len(), pov.clone());
@@ -506,11 +514,15 @@ async fn change_our_view(
);
if let Some(peers) = send_chunks_to.get(&candidate_hash) {
expect_chunks_network_message(virtual_overseer, &peers, &[candidate_hash], &[chunk]).await;
send_peers.push(peers.clone());
send_chunks.push(chunk);
}
}
}
}
expect_chunks_network_message(virtual_overseer, &send_peers, &candidates, &send_chunks).await;
}
async fn setup_peer_with_view(
@@ -725,17 +737,19 @@ fn reputation_verification() {
// Both peers send us this chunk already
chunks.remove(2);
expect_chunks_network_message(&mut virtual_overseer, &[peer_a.clone()], &[candidates[0].hash()], &chunks).await;
let send_peers = chunks.iter().map(|_| vec![peer_a.clone()]).collect::<Vec<_>>();
expect_chunks_network_message(&mut virtual_overseer, &send_peers, &[candidates[0].hash()], &chunks).await;
overseer_send(&mut virtual_overseer, NetworkBridgeEvent::PeerViewChange(peer_b.clone(), view![current])).await;
expect_chunks_network_message(&mut virtual_overseer, &[peer_b.clone()], &[candidates[0].hash()], &chunks).await;
let send_peers = chunks.iter().map(|_| vec![peer_b.clone()]).collect::<Vec<_>>();
expect_chunks_network_message(&mut virtual_overseer, &send_peers, &[candidates[0].hash()], &chunks).await;
peer_send_message(&mut virtual_overseer, peer_a.clone(), valid.clone(), BENEFIT_VALID_MESSAGE_FIRST).await;
expect_chunks_network_message(
&mut virtual_overseer,
&[peer_b.clone()],
&[vec![peer_b.clone()]],
&[candidates[1].hash()],
&[valid.erasure_chunk.clone()],
).await;
@@ -901,9 +915,10 @@ fn candidate_chunks_are_put_into_message_vault_when_candidate_is_first_seen() {
validator_public.len(),
pov_blocks[0].clone(),
);
let send_peers = chunks.iter().map(|_| vec![peer_a.clone()]).collect::<Vec<_>>();
expect_chunks_network_message(
&mut virtual_overseer,
&[peer_a],
&send_peers,
&[candidates[0].hash()],
&chunks,
).await;
@@ -1253,9 +1268,11 @@ fn new_peer_gets_all_chunks_send() {
chunks.push(valid.erasure_chunk);
let send_peers = chunks.iter().map(|_| vec![peer_a.clone()]).collect::<Vec<_>>();
expect_chunks_network_message(
&mut virtual_overseer,
&[peer_a],
&send_peers,
&[candidates[0].hash(), candidates[1].hash()],
&chunks,
).await;