diff --git a/polkadot/node/network/availability-distribution/src/lib.rs b/polkadot/node/network/availability-distribution/src/lib.rs index 80d60dc75c..0326e96b7b 100644 --- a/polkadot/node/network/availability-distribution/src/lib.rs +++ b/polkadot/node/network/availability-distribution/src/lib.rs @@ -394,6 +394,7 @@ where } // handle all candidates + let mut messages_out = Vec::new(); for candidate_hash in state.cached_live_candidates_unioned(view.difference(&old_view)) { // If we are not a validator for this candidate, let's skip it. match state.per_candidate.get(&candidate_hash) { @@ -475,13 +476,16 @@ where .cloned() .collect::>(); - send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, peers, iter::once(message)).await; + add_tracked_messages_to_batch(&mut messages_out, per_candidate, metrics, peers, iter::once(message)); } // traces are better if we wait until the loop is done to drop. per_candidate.drop_span_after_own_availability(); } + // send all batched messages out. + send_batch_to_network(ctx, messages_out).await; + // cleanup the removed relay parents and their states old_view.difference(&view).for_each(|r| state.remove_relay_parent(r)); state.clean_up_live_under_cache(); @@ -489,17 +493,15 @@ where Ok(()) } -#[tracing::instrument(level = "trace", skip(ctx, metrics, message_iter), fields(subsystem = LOG_TARGET))] -async fn send_tracked_gossip_messages_to_peers( - ctx: &mut Context, +// After this function is invoked, the state reflects the messages as having been sent to a peer. +#[tracing::instrument(level = "trace", skip(batch, metrics, message_iter), fields(subsystem = LOG_TARGET))] +fn add_tracked_messages_to_batch( + batch: &mut Vec<(Vec, protocol_v1::ValidationProtocol)>, per_candidate: &mut PerCandidate, metrics: &Metrics, peers: Vec, message_iter: impl IntoIterator, -) -where - Context: SubsystemContext, -{ +) { for message in message_iter { for peer in peers.iter() { per_candidate @@ -510,16 +512,25 @@ where } if !peers.is_empty() { - ctx.send_message(NetworkBridgeMessage::SendValidationMessage( + batch.push(( peers.clone(), protocol_v1::ValidationProtocol::AvailabilityDistribution(message.into()), - ).into()).await; + )); metrics.on_chunk_distributed(); } } } +async fn send_batch_to_network( + ctx: &mut impl SubsystemContext, + batch: Vec<(Vec, protocol_v1::ValidationProtocol)>, +) { + if !batch.is_empty() { + ctx.send_message(NetworkBridgeMessage::SendValidationMessages(batch).into()).await + } +} + // Send the difference between two views which were not sent // to that particular peer. #[tracing::instrument(level = "trace", skip(ctx, metrics), fields(subsystem = LOG_TARGET))] @@ -544,6 +555,7 @@ where let added_candidates = state.cached_live_candidates_unioned(added.iter()); // Send all messages we've seen before and the peer is now interested in. + let mut batch = Vec::new(); for candidate_hash in added_candidates { let per_candidate = match state.per_candidate.get_mut(&candidate_hash) { Some(p) => p, @@ -564,8 +576,10 @@ where .cloned() .collect::>(); - send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, vec![origin.clone()], messages).await; + add_tracked_messages_to_batch(&mut batch, per_candidate, metrics, vec![origin.clone()], messages); } + + send_batch_to_network(ctx, batch).await; } /// Obtain the first key which has a signing key. @@ -753,7 +767,9 @@ where drop(span); // gossip that message to interested peers - send_tracked_gossip_messages_to_peers(ctx, candidate_entry, metrics, peers, iter::once(message)).await; + let mut batch = Vec::new(); + add_tracked_messages_to_batch(&mut batch, candidate_entry, metrics, peers, iter::once(message)); + send_batch_to_network(ctx, batch).await; Ok(()) } diff --git a/polkadot/node/network/availability-distribution/src/tests.rs b/polkadot/node/network/availability-distribution/src/tests.rs index 524855e713..f1573dc62d 100644 --- a/polkadot/node/network/availability-distribution/src/tests.rs +++ b/polkadot/node/network/availability-distribution/src/tests.rs @@ -372,28 +372,32 @@ fn derive_erasure_chunks_with_proofs( async fn expect_chunks_network_message( virtual_overseer: &mut test_helpers::TestSubsystemContextHandle, - peers: &[PeerId], + peers: &[Vec], candidates: &[CandidateHash], chunks: &[ErasureChunk], ) { - for _ in 0..chunks.len() { - assert_matches!( - overseer_recv(virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::SendValidationMessage( - send_peers, + if chunks.is_empty() { return } + + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::SendValidationMessages(msgs) + ) => { + assert_eq!(msgs.len(), chunks.len()); + for (send_peers, msg) in msgs { + assert_matches!( + msg, protocol_v1::ValidationProtocol::AvailabilityDistribution( - protocol_v1::AvailabilityDistributionMessage::Chunk(send_candidate, send_chunk), - ), - ) - ) => { - assert!(candidates.contains(&send_candidate), format!("Could not find candidate: {:?}", send_candidate)); - assert!(chunks.iter().any(|c| c == &send_chunk), format!("Could not find chunk: {:?}", send_chunk)); - assert_eq!(peers.len(), send_peers.len()); - assert!(peers.iter().all(|p| send_peers.contains(p))); + protocol_v1::AvailabilityDistributionMessage::Chunk(send_candidate, send_chunk) + ) => { + let i = chunks.iter().position(|c| c == &send_chunk).unwrap(); + assert!(candidates.contains(&send_candidate), format!("Could not find candidate: {:?}", send_candidate)); + assert_eq!(&peers[i], &send_peers); + } + ); } - ); - } + } + ) } async fn change_our_view( @@ -464,6 +468,9 @@ async fn change_our_view( ); } + let mut send_peers = Vec::new(); + let mut send_chunks = Vec::new(); + let mut candidates = Vec::new(); for _ in 0..data_availability.len() { let (available, candidate_hash) = assert_matches!( overseer_recv(virtual_overseer).await, @@ -485,6 +492,7 @@ async fn change_our_view( continue; } + candidates.push(candidate_hash); if let Some((pov, persisted)) = chunk_data_per_candidate.get(&candidate_hash) { let chunks = make_erasure_chunks(persisted.clone(), validator_public.len(), pov.clone()); @@ -506,11 +514,15 @@ async fn change_our_view( ); if let Some(peers) = send_chunks_to.get(&candidate_hash) { - expect_chunks_network_message(virtual_overseer, &peers, &[candidate_hash], &[chunk]).await; + send_peers.push(peers.clone()); + send_chunks.push(chunk); } } + } } + + expect_chunks_network_message(virtual_overseer, &send_peers, &candidates, &send_chunks).await; } async fn setup_peer_with_view( @@ -725,17 +737,19 @@ fn reputation_verification() { // Both peers send us this chunk already chunks.remove(2); - expect_chunks_network_message(&mut virtual_overseer, &[peer_a.clone()], &[candidates[0].hash()], &chunks).await; + let send_peers = chunks.iter().map(|_| vec![peer_a.clone()]).collect::>(); + expect_chunks_network_message(&mut virtual_overseer, &send_peers, &[candidates[0].hash()], &chunks).await; overseer_send(&mut virtual_overseer, NetworkBridgeEvent::PeerViewChange(peer_b.clone(), view![current])).await; - expect_chunks_network_message(&mut virtual_overseer, &[peer_b.clone()], &[candidates[0].hash()], &chunks).await; + let send_peers = chunks.iter().map(|_| vec![peer_b.clone()]).collect::>(); + expect_chunks_network_message(&mut virtual_overseer, &send_peers, &[candidates[0].hash()], &chunks).await; peer_send_message(&mut virtual_overseer, peer_a.clone(), valid.clone(), BENEFIT_VALID_MESSAGE_FIRST).await; expect_chunks_network_message( &mut virtual_overseer, - &[peer_b.clone()], + &[vec![peer_b.clone()]], &[candidates[1].hash()], &[valid.erasure_chunk.clone()], ).await; @@ -901,9 +915,10 @@ fn candidate_chunks_are_put_into_message_vault_when_candidate_is_first_seen() { validator_public.len(), pov_blocks[0].clone(), ); + let send_peers = chunks.iter().map(|_| vec![peer_a.clone()]).collect::>(); expect_chunks_network_message( &mut virtual_overseer, - &[peer_a], + &send_peers, &[candidates[0].hash()], &chunks, ).await; @@ -1253,9 +1268,11 @@ fn new_peer_gets_all_chunks_send() { chunks.push(valid.erasure_chunk); + let send_peers = chunks.iter().map(|_| vec![peer_a.clone()]).collect::>(); + expect_chunks_network_message( &mut virtual_overseer, - &[peer_a], + &send_peers, &[candidates[0].hash(), candidates[1].hash()], &chunks, ).await; diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs index d351c2ac62..50fc8e7d87 100644 --- a/polkadot/node/network/bridge/src/lib.rs +++ b/polkadot/node/network/bridge/src/lib.rs @@ -260,8 +260,8 @@ struct PeerData { #[derive(Debug)] enum Action { - SendValidationMessage(Vec, protocol_v1::ValidationProtocol), - SendCollationMessage(Vec, protocol_v1::CollationProtocol), + SendValidationMessages(Vec<(Vec, protocol_v1::ValidationProtocol)>), + SendCollationMessages(Vec<(Vec, protocol_v1::CollationProtocol)>), ConnectToValidators { validator_ids: Vec, connected: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>, @@ -296,9 +296,13 @@ fn action_from_overseer_message( Ok(FromOverseer::Communication { msg }) => match msg { NetworkBridgeMessage::ReportPeer(peer, rep) => Action::ReportPeer(peer, rep), NetworkBridgeMessage::SendValidationMessage(peers, msg) - => Action::SendValidationMessage(peers, msg), + => Action::SendValidationMessages(vec![(peers, msg)]), NetworkBridgeMessage::SendCollationMessage(peers, msg) - => Action::SendCollationMessage(peers, msg), + => Action::SendCollationMessages(vec![(peers, msg)]), + NetworkBridgeMessage::SendValidationMessages(msgs) + => Action::SendValidationMessages(msgs), + NetworkBridgeMessage::SendCollationMessages(msgs) + => Action::SendCollationMessages(msgs), NetworkBridgeMessage::ConnectToValidators { validator_ids, connected } => Action::ConnectToValidators { validator_ids, connected }, }, @@ -623,19 +627,27 @@ where Action::Nop => {} Action::Abort => return Ok(()), - Action::SendValidationMessage(peers, msg) => send_message( - &mut network_service, - peers, - PeerSet::Validation, - WireMessage::ProtocolMessage(msg), - ).await?, + Action::SendValidationMessages(msgs) => { + for (peers, msg) in msgs { + send_message( + &mut network_service, + peers, + PeerSet::Validation, + WireMessage::ProtocolMessage(msg), + ).await? + } + } - Action::SendCollationMessage(peers, msg) => send_message( - &mut network_service, - peers, - PeerSet::Collation, - WireMessage::ProtocolMessage(msg), - ).await?, + Action::SendCollationMessages(msgs) => { + for (peers, msg) in msgs { + send_message( + &mut network_service, + peers, + PeerSet::Collation, + WireMessage::ProtocolMessage(msg), + ).await? + } + } Action::ConnectToValidators { validator_ids, diff --git a/polkadot/node/subsystem/src/messages.rs b/polkadot/node/subsystem/src/messages.rs index 3b2006bea0..995256d5d4 100644 --- a/polkadot/node/subsystem/src/messages.rs +++ b/polkadot/node/subsystem/src/messages.rs @@ -203,6 +203,12 @@ pub enum NetworkBridgeMessage { /// Send a message to one or more peers on the collation peer-set. SendCollationMessage(Vec, protocol_v1::CollationProtocol), + /// Send a batch of validation messages. + SendValidationMessages(Vec<(Vec, protocol_v1::ValidationProtocol)>), + + /// Send a batch of collation messages. + SendCollationMessages(Vec<(Vec, protocol_v1::CollationProtocol)>), + /// Connect to peers who represent the given `validator_ids`. /// /// Also ask the network to stay connected to these peers at least @@ -225,6 +231,8 @@ impl NetworkBridgeMessage { Self::ReportPeer(_, _) => None, Self::SendValidationMessage(_, _) => None, Self::SendCollationMessage(_, _) => None, + Self::SendValidationMessages(_) => None, + Self::SendCollationMessages(_) => None, Self::ConnectToValidators { .. } => None, } } diff --git a/polkadot/roadmap/implementers-guide/src/node/utility/network-bridge.md b/polkadot/roadmap/implementers-guide/src/node/utility/network-bridge.md index 2a47459bd7..9f51094336 100644 --- a/polkadot/roadmap/implementers-guide/src/node/utility/network-bridge.md +++ b/polkadot/roadmap/implementers-guide/src/node/utility/network-bridge.md @@ -86,11 +86,11 @@ Map the message onto the corresponding [Event Handler](#event-handlers) based on - Adjust peer reputation according to cost or benefit provided -### SendValidationMessage +### SendValidationMessage / SendValidationMessages - Issue a corresponding `ProtocolMessage` to each listed peer on the validation peer-set. -### SendCollationMessage +### SendCollationMessage / SendCollationMessages - Issue a corresponding `ProtocolMessage` to each listed peer on the collation peer-set. diff --git a/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md b/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md index 392927f9eb..02219a91f1 100644 --- a/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md +++ b/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md @@ -320,7 +320,11 @@ enum NetworkBridgeMessage { /// Send a message to one or more peers on the validation peerset. SendValidationMessage([PeerId], ValidationProtocolV1), /// Send a message to one or more peers on the collation peerset. - SendCollationMessage([PeerId], ValidationProtocolV1), + SendCollationMessage([PeerId], CollationProtocolV1), + /// Send multiple validation messages. + SendValidationMessages([([PeerId, ValidationProtocolV1])]), + /// Send multiple collation messages. + SendCollationMessages([([PeerId, ValidationProtocolV1])]), /// Connect to peers who represent the given `validator_ids`. /// /// Also ask the network to stay connected to these peers at least