From 475915ff107efe7a246f49624495f5364d800589 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Fri, 8 Jan 2021 19:05:19 +0100 Subject: [PATCH] Do not send empty view updates to peers (#2233) * Do not send empty view updates to peers It happened that we send empty view updates to our peers, because we only updated our finalized block. This could lead to situations where we overwhelmed sub systems with too many messages. On Rococo this lead to constant restarts of our nodes, because some node apparently was finalizing a lot of blocks. To prevent this, the pr is doing the following: 1. If a peer sends us an empty view, we report this peer and decrease it reputation. 2. We ensure that we only send a view update when the `heads` changed and not only the `finalized_number`. 3. We do not send empty `ActiveLeavesUpdates` from the overseer, as this makes no sense to send these empty updates. If some subsystem is relying on the finalized block, it needs to listen for the overseer signal. * Update node/network/bridge/src/lib.rs Co-authored-by: Peter Goodspeed-Niklaus * Don't work if they're are no added heads * Fix test * Ahhh * More fixes Co-authored-by: Peter Goodspeed-Niklaus --- .../availability-distribution/src/lib.rs | 4 + polkadot/node/network/bridge/src/lib.rs | 140 ++++++++++++------ polkadot/node/overseer/src/lib.rs | 115 +++++++++++--- 3 files changed, 190 insertions(+), 69 deletions(-) diff --git a/polkadot/node/network/availability-distribution/src/lib.rs b/polkadot/node/network/availability-distribution/src/lib.rs index 0326e96b7b..8e3ada4005 100644 --- a/polkadot/node/network/availability-distribution/src/lib.rs +++ b/polkadot/node/network/availability-distribution/src/lib.rs @@ -550,6 +550,10 @@ where *current = view; + if added.is_empty() { + return + } + // only contains the intersection of what we are interested and // the union of all relay parent's candidates. let added_candidates = state.cached_live_candidates_unioned(added.iter()); diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs index 50fc8e7d87..a015947912 100644 --- a/polkadot/node/network/bridge/src/lib.rs +++ b/polkadot/node/network/bridge/src/lib.rs @@ -59,12 +59,10 @@ pub const VALIDATION_PROTOCOL_NAME: &'static str = "/polkadot/validation/1"; /// The protocol name for the collation peer-set. pub const COLLATION_PROTOCOL_NAME: &'static str = "/polkadot/collation/1"; -const MALFORMED_MESSAGE_COST: ReputationChange - = ReputationChange::new(-500, "Malformed Network-bridge message"); -const UNCONNECTED_PEERSET_COST: ReputationChange - = ReputationChange::new(-50, "Message sent to un-connected peer-set"); -const MALFORMED_VIEW_COST: ReputationChange - = ReputationChange::new(-500, "Malformed view"); +const MALFORMED_MESSAGE_COST: ReputationChange = ReputationChange::new(-500, "Malformed Network-bridge message"); +const UNCONNECTED_PEERSET_COST: ReputationChange = ReputationChange::new(-50, "Message sent to un-connected peer-set"); +const MALFORMED_VIEW_COST: ReputationChange = ReputationChange::new(-500, "Malformed view"); +const EMPTY_VIEW_COST: ReputationChange = ReputationChange::new(-500, "Peer sent us an empty view"); // network bridge log target const LOG_TARGET: &'static str = "network_bridge"; @@ -388,7 +386,11 @@ async fn update_our_view( collation_peers: &HashMap, ) -> SubsystemResult<()> { let new_view = construct_view(live_heads.iter().map(|v| v.0), finalized_number); - if *local_view == new_view { return Ok(()) } + + // We only want to send a view update when the heads changed, not when only the finalized block changed. + if local_view.heads == new_view.heads { + return Ok(()) + } *local_view = new_view.clone(); @@ -441,6 +443,13 @@ async fn handle_peer_messages( MALFORMED_VIEW_COST, ).await?; + continue + } else if new_view.heads.is_empty() { + net.report_peer( + peer.clone(), + EMPTY_VIEW_COST, + ).await?; + continue } else if new_view == peer_data.view { continue @@ -923,10 +932,11 @@ mod tests { } } - // network actions are sensitive to ordering of `PeerId`s within a `HashMap`, so - // we need to use this to prevent fragile reliance on peer ordering. - fn network_actions_contains(actions: &[NetworkAction], action: &NetworkAction) -> bool { - actions.iter().find(|&x| x == action).is_some() + /// Assert that the given actions contain the given `action`. + fn assert_network_actions_contains(actions: &[NetworkAction], action: &NetworkAction) { + if !actions.iter().any(|x| x == action) { + panic!("Could not find `{:?}` in `{:?}`", action, actions); + } } struct TestHarness { @@ -1035,23 +1045,85 @@ mod tests { view![hash_a] ).encode(); - assert!(network_actions_contains( + assert_network_actions_contains( &actions, &NetworkAction::WriteNotification( peer_a, PeerSet::Validation, wire_message.clone(), ), - )); + ); - assert!(network_actions_contains( + assert_network_actions_contains( &actions, &NetworkAction::WriteNotification( peer_b, PeerSet::Validation, wire_message.clone(), ), - )); + ); + }); + } + + #[test] + fn do_not_send_view_update_when_only_finalized_block_changed() { + test_harness(|test_harness| async move { + let TestHarness { mut network_handle, mut virtual_overseer } = test_harness; + + let peer_a = PeerId::random(); + let peer_b = PeerId::random(); + + network_handle.connect_peer( + peer_a.clone(), + PeerSet::Validation, + ObservedRole::Full, + ).await; + network_handle.connect_peer( + peer_b.clone(), + PeerSet::Validation, + ObservedRole::Full, + ).await; + + let hash_a = Hash::repeat_byte(1); + + virtual_overseer.send(FromOverseer::Signal(OverseerSignal::BlockFinalized(Hash::random(), 5))).await; + + // Send some empty active leaves update + // + // This should not trigger a view update to our peers. + virtual_overseer.send( + FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::default())) + ).await; + + // This should trigger the view update to our peers. + virtual_overseer.send( + FromOverseer::Signal(OverseerSignal::ActiveLeaves( + ActiveLeavesUpdate::start_work(hash_a, Arc::new(JaegerSpan::Disabled)), + )) + ).await; + + let actions = network_handle.next_network_actions(2).await; + let wire_message = WireMessage::::ViewUpdate( + View { heads: vec![hash_a], finalized_number: 5 } + ).encode(); + + assert_network_actions_contains( + &actions, + &NetworkAction::WriteNotification( + peer_a, + PeerSet::Validation, + wire_message.clone(), + ), + ); + + assert_network_actions_contains( + &actions, + &NetworkAction::WriteNotification( + peer_b, + PeerSet::Validation, + wire_message.clone(), + ), + ); }); } @@ -1225,14 +1297,14 @@ mod tests { view![hash_a] ).encode(); - assert!(network_actions_contains( + assert_network_actions_contains( &actions, &NetworkAction::WriteNotification( peer.clone(), PeerSet::Collation, wire_message.clone(), ), - )); + ); }); } @@ -1292,13 +1364,13 @@ mod tests { ).await; let actions = network_handle.next_network_actions(1).await; - assert!(network_actions_contains( + assert_network_actions_contains( &actions, &NetworkAction::ReputationChange( peer_a.clone(), UNCONNECTED_PEERSET_COST, ), - )); + ); // peer B has the message relayed. @@ -1402,7 +1474,6 @@ mod tests { let hash_a = Hash::repeat_byte(1); let hash_b = Hash::repeat_byte(2); - let hash_c = Hash::repeat_byte(3); virtual_overseer.send( FromOverseer::Signal(OverseerSignal::BlockFinalized(hash_a, 1)) @@ -1421,39 +1492,14 @@ mod tests { } ).encode(); - assert!(network_actions_contains( + assert_network_actions_contains( &actions, &NetworkAction::WriteNotification( peer_a.clone(), PeerSet::Validation, wire_message.clone(), ), - )); - - // view updates are issued even when `ActiveLeavesUpdate` is empty - virtual_overseer.send( - FromOverseer::Signal(OverseerSignal::BlockFinalized(hash_c, 3)) - ).await; - virtual_overseer.send( - FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::default())) - ).await; - - let actions = network_handle.next_network_actions(1).await; - let wire_message = WireMessage::::ViewUpdate( - View { - heads: vec![hash_b], - finalized_number: 3, - } - ).encode(); - - assert!(network_actions_contains( - &actions, - &NetworkAction::WriteNotification( - peer_a, - PeerSet::Validation, - wire_message.clone(), - ), - )); + ); }); } diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index 05420026e6..9c4f88a9ef 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -137,7 +137,7 @@ enum ToOverseer { /// This structure exists solely for the purposes of decoupling /// `Overseer` code from the client code and the necessity to call /// `HeaderBackend::block_number_from_id()`. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct BlockInfo { /// hash of the block. pub hash: Hash, @@ -1514,7 +1514,9 @@ where update.activated.push((hash, span)); } - self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; + if !update.is_empty() { + self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; + } loop { select! { @@ -1620,10 +1622,14 @@ where self.on_head_deactivated(deactivated) } - self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number)).await?; - // broadcast `ActiveLeavesUpdate` even if empty to issue view updates - self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; + + // If there are no leaves being deactivated, we don't need to send an update. + // + // Our peers will be informed about our finalized block the next time we activating/deactivating some leaf. + if !update.is_empty() { + self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; + } Ok(()) } @@ -1898,9 +1904,9 @@ mod tests { } } - struct TestSubsystem4; + struct ReturnOnStart; - impl Subsystem for TestSubsystem4 + impl Subsystem for ReturnOnStart where C: SubsystemContext { fn start(self, mut _ctx: C) -> SpawnedSubsystem { @@ -2043,29 +2049,22 @@ mod tests { // Spawn a subsystem that immediately exits. // - // Should immediately conclude the overseer itself with an error. + // Should immediately conclude the overseer itself. #[test] - fn overseer_panics_on_subsystem_exit() { + fn overseer_ends_on_subsystem_exit() { let spawner = sp_core::testing::TaskExecutor::new(); executor::block_on(async move { - let (s1_tx, _) = mpsc::channel(64); let all_subsystems = AllSubsystems::<()>::dummy() - .replace_candidate_validation(TestSubsystem1(s1_tx)) - .replace_candidate_backing(TestSubsystem4); + .replace_candidate_backing(ReturnOnStart); let (overseer, _handle) = Overseer::new( vec![], all_subsystems, None, spawner, ).unwrap(); - let overseer_fut = overseer.run().fuse(); - pin_mut!(overseer_fut); - select! { - res = overseer_fut => assert!(res.is_err()), - complete => (), - } + overseer.run().await.unwrap(); }) } @@ -2309,9 +2308,8 @@ mod tests { complete => break, } - if ss5_results.len() == expected_heartbeats.len() && - ss6_results.len() == expected_heartbeats.len() { - handler.stop().await; + if ss5_results.len() == expected_heartbeats.len() && ss6_results.len() == expected_heartbeats.len() { + handler.stop().await; } } @@ -2327,6 +2325,79 @@ mod tests { }); } + #[test] + fn do_not_send_empty_leaves_update_on_block_finalization() { + let spawner = sp_core::testing::TaskExecutor::new(); + + executor::block_on(async move { + let imported_block = BlockInfo { + hash: Hash::random(), + parent_hash: Hash::random(), + number: 1, + }; + + let finalized_block = BlockInfo { + hash: Hash::random(), + parent_hash: Hash::random(), + number: 1, + }; + + let (tx_5, mut rx_5) = mpsc::channel(64); + + let all_subsystems = AllSubsystems::<()>::dummy() + .replace_candidate_backing(TestSubsystem6(tx_5)); + + let (overseer, mut handler) = Overseer::new( + Vec::new(), + all_subsystems, + None, + spawner, + ).unwrap(); + + let overseer_fut = overseer.run().fuse(); + pin_mut!(overseer_fut); + + let mut ss5_results = Vec::new(); + + handler.block_finalized(finalized_block.clone()).await; + handler.block_imported(imported_block.clone()).await; + + let expected_heartbeats = vec![ + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { + activated: [ + (imported_block.hash, Arc::new(JaegerSpan::Disabled)), + ].as_ref().into(), + ..Default::default() + }), + OverseerSignal::BlockFinalized(finalized_block.hash, 1), + ]; + + loop { + select! { + res = overseer_fut => { + assert!(res.is_ok()); + break; + }, + res = rx_5.next() => { + if let Some(res) = dbg!(res) { + ss5_results.push(res); + } + } + } + + if ss5_results.len() == expected_heartbeats.len() { + handler.stop().await; + } + } + + assert_eq!(ss5_results.len(), expected_heartbeats.len()); + + for expected in expected_heartbeats { + assert!(ss5_results.contains(&expected)); + } + }); + } + #[derive(Clone)] struct CounterSubsystem { stop_signals_received: Arc, @@ -2542,7 +2613,7 @@ mod tests { assert_eq!(stop_signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS); // x2 because of broadcast_signal on startup - assert_eq!(signals_received.load(atomic::Ordering::SeqCst), 2 * NUM_SUBSYSTEMS); + assert_eq!(signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS); // -1 for BitfieldSigning assert_eq!(msgs_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS - 1);