diff --git a/polkadot/node/network/availability-distribution/src/lib.rs b/polkadot/node/network/availability-distribution/src/lib.rs index 0326e96b7b..8e3ada4005 100644 --- a/polkadot/node/network/availability-distribution/src/lib.rs +++ b/polkadot/node/network/availability-distribution/src/lib.rs @@ -550,6 +550,10 @@ where *current = view; + if added.is_empty() { + return + } + // only contains the intersection of what we are interested and // the union of all relay parent's candidates. let added_candidates = state.cached_live_candidates_unioned(added.iter()); diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs index 50fc8e7d87..a015947912 100644 --- a/polkadot/node/network/bridge/src/lib.rs +++ b/polkadot/node/network/bridge/src/lib.rs @@ -59,12 +59,10 @@ pub const VALIDATION_PROTOCOL_NAME: &'static str = "/polkadot/validation/1"; /// The protocol name for the collation peer-set. pub const COLLATION_PROTOCOL_NAME: &'static str = "/polkadot/collation/1"; -const MALFORMED_MESSAGE_COST: ReputationChange - = ReputationChange::new(-500, "Malformed Network-bridge message"); -const UNCONNECTED_PEERSET_COST: ReputationChange - = ReputationChange::new(-50, "Message sent to un-connected peer-set"); -const MALFORMED_VIEW_COST: ReputationChange - = ReputationChange::new(-500, "Malformed view"); +const MALFORMED_MESSAGE_COST: ReputationChange = ReputationChange::new(-500, "Malformed Network-bridge message"); +const UNCONNECTED_PEERSET_COST: ReputationChange = ReputationChange::new(-50, "Message sent to un-connected peer-set"); +const MALFORMED_VIEW_COST: ReputationChange = ReputationChange::new(-500, "Malformed view"); +const EMPTY_VIEW_COST: ReputationChange = ReputationChange::new(-500, "Peer sent us an empty view"); // network bridge log target const LOG_TARGET: &'static str = "network_bridge"; @@ -388,7 +386,11 @@ async fn update_our_view( collation_peers: &HashMap, ) -> SubsystemResult<()> { let new_view = construct_view(live_heads.iter().map(|v| v.0), finalized_number); - if *local_view == new_view { return Ok(()) } + + // We only want to send a view update when the heads changed, not when only the finalized block changed. + if local_view.heads == new_view.heads { + return Ok(()) + } *local_view = new_view.clone(); @@ -441,6 +443,13 @@ async fn handle_peer_messages( MALFORMED_VIEW_COST, ).await?; + continue + } else if new_view.heads.is_empty() { + net.report_peer( + peer.clone(), + EMPTY_VIEW_COST, + ).await?; + continue } else if new_view == peer_data.view { continue @@ -923,10 +932,11 @@ mod tests { } } - // network actions are sensitive to ordering of `PeerId`s within a `HashMap`, so - // we need to use this to prevent fragile reliance on peer ordering. - fn network_actions_contains(actions: &[NetworkAction], action: &NetworkAction) -> bool { - actions.iter().find(|&x| x == action).is_some() + /// Assert that the given actions contain the given `action`. + fn assert_network_actions_contains(actions: &[NetworkAction], action: &NetworkAction) { + if !actions.iter().any(|x| x == action) { + panic!("Could not find `{:?}` in `{:?}`", action, actions); + } } struct TestHarness { @@ -1035,23 +1045,85 @@ mod tests { view![hash_a] ).encode(); - assert!(network_actions_contains( + assert_network_actions_contains( &actions, &NetworkAction::WriteNotification( peer_a, PeerSet::Validation, wire_message.clone(), ), - )); + ); - assert!(network_actions_contains( + assert_network_actions_contains( &actions, &NetworkAction::WriteNotification( peer_b, PeerSet::Validation, wire_message.clone(), ), - )); + ); + }); + } + + #[test] + fn do_not_send_view_update_when_only_finalized_block_changed() { + test_harness(|test_harness| async move { + let TestHarness { mut network_handle, mut virtual_overseer } = test_harness; + + let peer_a = PeerId::random(); + let peer_b = PeerId::random(); + + network_handle.connect_peer( + peer_a.clone(), + PeerSet::Validation, + ObservedRole::Full, + ).await; + network_handle.connect_peer( + peer_b.clone(), + PeerSet::Validation, + ObservedRole::Full, + ).await; + + let hash_a = Hash::repeat_byte(1); + + virtual_overseer.send(FromOverseer::Signal(OverseerSignal::BlockFinalized(Hash::random(), 5))).await; + + // Send some empty active leaves update + // + // This should not trigger a view update to our peers. + virtual_overseer.send( + FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::default())) + ).await; + + // This should trigger the view update to our peers. + virtual_overseer.send( + FromOverseer::Signal(OverseerSignal::ActiveLeaves( + ActiveLeavesUpdate::start_work(hash_a, Arc::new(JaegerSpan::Disabled)), + )) + ).await; + + let actions = network_handle.next_network_actions(2).await; + let wire_message = WireMessage::::ViewUpdate( + View { heads: vec![hash_a], finalized_number: 5 } + ).encode(); + + assert_network_actions_contains( + &actions, + &NetworkAction::WriteNotification( + peer_a, + PeerSet::Validation, + wire_message.clone(), + ), + ); + + assert_network_actions_contains( + &actions, + &NetworkAction::WriteNotification( + peer_b, + PeerSet::Validation, + wire_message.clone(), + ), + ); }); } @@ -1225,14 +1297,14 @@ mod tests { view![hash_a] ).encode(); - assert!(network_actions_contains( + assert_network_actions_contains( &actions, &NetworkAction::WriteNotification( peer.clone(), PeerSet::Collation, wire_message.clone(), ), - )); + ); }); } @@ -1292,13 +1364,13 @@ mod tests { ).await; let actions = network_handle.next_network_actions(1).await; - assert!(network_actions_contains( + assert_network_actions_contains( &actions, &NetworkAction::ReputationChange( peer_a.clone(), UNCONNECTED_PEERSET_COST, ), - )); + ); // peer B has the message relayed. @@ -1402,7 +1474,6 @@ mod tests { let hash_a = Hash::repeat_byte(1); let hash_b = Hash::repeat_byte(2); - let hash_c = Hash::repeat_byte(3); virtual_overseer.send( FromOverseer::Signal(OverseerSignal::BlockFinalized(hash_a, 1)) @@ -1421,39 +1492,14 @@ mod tests { } ).encode(); - assert!(network_actions_contains( + assert_network_actions_contains( &actions, &NetworkAction::WriteNotification( peer_a.clone(), PeerSet::Validation, wire_message.clone(), ), - )); - - // view updates are issued even when `ActiveLeavesUpdate` is empty - virtual_overseer.send( - FromOverseer::Signal(OverseerSignal::BlockFinalized(hash_c, 3)) - ).await; - virtual_overseer.send( - FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::default())) - ).await; - - let actions = network_handle.next_network_actions(1).await; - let wire_message = WireMessage::::ViewUpdate( - View { - heads: vec![hash_b], - finalized_number: 3, - } - ).encode(); - - assert!(network_actions_contains( - &actions, - &NetworkAction::WriteNotification( - peer_a, - PeerSet::Validation, - wire_message.clone(), - ), - )); + ); }); } diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index 05420026e6..9c4f88a9ef 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -137,7 +137,7 @@ enum ToOverseer { /// This structure exists solely for the purposes of decoupling /// `Overseer` code from the client code and the necessity to call /// `HeaderBackend::block_number_from_id()`. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct BlockInfo { /// hash of the block. pub hash: Hash, @@ -1514,7 +1514,9 @@ where update.activated.push((hash, span)); } - self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; + if !update.is_empty() { + self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; + } loop { select! { @@ -1620,10 +1622,14 @@ where self.on_head_deactivated(deactivated) } - self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number)).await?; - // broadcast `ActiveLeavesUpdate` even if empty to issue view updates - self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; + + // If there are no leaves being deactivated, we don't need to send an update. + // + // Our peers will be informed about our finalized block the next time we activating/deactivating some leaf. + if !update.is_empty() { + self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; + } Ok(()) } @@ -1898,9 +1904,9 @@ mod tests { } } - struct TestSubsystem4; + struct ReturnOnStart; - impl Subsystem for TestSubsystem4 + impl Subsystem for ReturnOnStart where C: SubsystemContext { fn start(self, mut _ctx: C) -> SpawnedSubsystem { @@ -2043,29 +2049,22 @@ mod tests { // Spawn a subsystem that immediately exits. // - // Should immediately conclude the overseer itself with an error. + // Should immediately conclude the overseer itself. #[test] - fn overseer_panics_on_subsystem_exit() { + fn overseer_ends_on_subsystem_exit() { let spawner = sp_core::testing::TaskExecutor::new(); executor::block_on(async move { - let (s1_tx, _) = mpsc::channel(64); let all_subsystems = AllSubsystems::<()>::dummy() - .replace_candidate_validation(TestSubsystem1(s1_tx)) - .replace_candidate_backing(TestSubsystem4); + .replace_candidate_backing(ReturnOnStart); let (overseer, _handle) = Overseer::new( vec![], all_subsystems, None, spawner, ).unwrap(); - let overseer_fut = overseer.run().fuse(); - pin_mut!(overseer_fut); - select! { - res = overseer_fut => assert!(res.is_err()), - complete => (), - } + overseer.run().await.unwrap(); }) } @@ -2309,9 +2308,8 @@ mod tests { complete => break, } - if ss5_results.len() == expected_heartbeats.len() && - ss6_results.len() == expected_heartbeats.len() { - handler.stop().await; + if ss5_results.len() == expected_heartbeats.len() && ss6_results.len() == expected_heartbeats.len() { + handler.stop().await; } } @@ -2327,6 +2325,79 @@ mod tests { }); } + #[test] + fn do_not_send_empty_leaves_update_on_block_finalization() { + let spawner = sp_core::testing::TaskExecutor::new(); + + executor::block_on(async move { + let imported_block = BlockInfo { + hash: Hash::random(), + parent_hash: Hash::random(), + number: 1, + }; + + let finalized_block = BlockInfo { + hash: Hash::random(), + parent_hash: Hash::random(), + number: 1, + }; + + let (tx_5, mut rx_5) = mpsc::channel(64); + + let all_subsystems = AllSubsystems::<()>::dummy() + .replace_candidate_backing(TestSubsystem6(tx_5)); + + let (overseer, mut handler) = Overseer::new( + Vec::new(), + all_subsystems, + None, + spawner, + ).unwrap(); + + let overseer_fut = overseer.run().fuse(); + pin_mut!(overseer_fut); + + let mut ss5_results = Vec::new(); + + handler.block_finalized(finalized_block.clone()).await; + handler.block_imported(imported_block.clone()).await; + + let expected_heartbeats = vec![ + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { + activated: [ + (imported_block.hash, Arc::new(JaegerSpan::Disabled)), + ].as_ref().into(), + ..Default::default() + }), + OverseerSignal::BlockFinalized(finalized_block.hash, 1), + ]; + + loop { + select! { + res = overseer_fut => { + assert!(res.is_ok()); + break; + }, + res = rx_5.next() => { + if let Some(res) = dbg!(res) { + ss5_results.push(res); + } + } + } + + if ss5_results.len() == expected_heartbeats.len() { + handler.stop().await; + } + } + + assert_eq!(ss5_results.len(), expected_heartbeats.len()); + + for expected in expected_heartbeats { + assert!(ss5_results.contains(&expected)); + } + }); + } + #[derive(Clone)] struct CounterSubsystem { stop_signals_received: Arc, @@ -2542,7 +2613,7 @@ mod tests { assert_eq!(stop_signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS); // x2 because of broadcast_signal on startup - assert_eq!(signals_received.load(atomic::Ordering::SeqCst), 2 * NUM_SUBSYSTEMS); + assert_eq!(signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS); // -1 for BitfieldSigning assert_eq!(msgs_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS - 1);