mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-13 00:01:09 +00:00
Do not send empty view updates to peers (#2233)
* Do not send empty view updates to peers It happened that we send empty view updates to our peers, because we only updated our finalized block. This could lead to situations where we overwhelmed sub systems with too many messages. On Rococo this lead to constant restarts of our nodes, because some node apparently was finalizing a lot of blocks. To prevent this, the pr is doing the following: 1. If a peer sends us an empty view, we report this peer and decrease it reputation. 2. We ensure that we only send a view update when the `heads` changed and not only the `finalized_number`. 3. We do not send empty `ActiveLeavesUpdates` from the overseer, as this makes no sense to send these empty updates. If some subsystem is relying on the finalized block, it needs to listen for the overseer signal. * Update node/network/bridge/src/lib.rs Co-authored-by: Peter Goodspeed-Niklaus <coriolinus@users.noreply.github.com> * Don't work if they're are no added heads * Fix test * Ahhh * More fixes Co-authored-by: Peter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>
This commit is contained in:
@@ -550,6 +550,10 @@ where
|
||||
|
||||
*current = view;
|
||||
|
||||
if added.is_empty() {
|
||||
return
|
||||
}
|
||||
|
||||
// only contains the intersection of what we are interested and
|
||||
// the union of all relay parent's candidates.
|
||||
let added_candidates = state.cached_live_candidates_unioned(added.iter());
|
||||
|
||||
@@ -59,12 +59,10 @@ pub const VALIDATION_PROTOCOL_NAME: &'static str = "/polkadot/validation/1";
|
||||
/// The protocol name for the collation peer-set.
|
||||
pub const COLLATION_PROTOCOL_NAME: &'static str = "/polkadot/collation/1";
|
||||
|
||||
const MALFORMED_MESSAGE_COST: ReputationChange
|
||||
= ReputationChange::new(-500, "Malformed Network-bridge message");
|
||||
const UNCONNECTED_PEERSET_COST: ReputationChange
|
||||
= ReputationChange::new(-50, "Message sent to un-connected peer-set");
|
||||
const MALFORMED_VIEW_COST: ReputationChange
|
||||
= ReputationChange::new(-500, "Malformed view");
|
||||
const MALFORMED_MESSAGE_COST: ReputationChange = ReputationChange::new(-500, "Malformed Network-bridge message");
|
||||
const UNCONNECTED_PEERSET_COST: ReputationChange = ReputationChange::new(-50, "Message sent to un-connected peer-set");
|
||||
const MALFORMED_VIEW_COST: ReputationChange = ReputationChange::new(-500, "Malformed view");
|
||||
const EMPTY_VIEW_COST: ReputationChange = ReputationChange::new(-500, "Peer sent us an empty view");
|
||||
|
||||
// network bridge log target
|
||||
const LOG_TARGET: &'static str = "network_bridge";
|
||||
@@ -388,7 +386,11 @@ async fn update_our_view(
|
||||
collation_peers: &HashMap<PeerId, PeerData>,
|
||||
) -> SubsystemResult<()> {
|
||||
let new_view = construct_view(live_heads.iter().map(|v| v.0), finalized_number);
|
||||
if *local_view == new_view { return Ok(()) }
|
||||
|
||||
// We only want to send a view update when the heads changed, not when only the finalized block changed.
|
||||
if local_view.heads == new_view.heads {
|
||||
return Ok(())
|
||||
}
|
||||
|
||||
*local_view = new_view.clone();
|
||||
|
||||
@@ -441,6 +443,13 @@ async fn handle_peer_messages<M>(
|
||||
MALFORMED_VIEW_COST,
|
||||
).await?;
|
||||
|
||||
continue
|
||||
} else if new_view.heads.is_empty() {
|
||||
net.report_peer(
|
||||
peer.clone(),
|
||||
EMPTY_VIEW_COST,
|
||||
).await?;
|
||||
|
||||
continue
|
||||
} else if new_view == peer_data.view {
|
||||
continue
|
||||
@@ -923,10 +932,11 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
// network actions are sensitive to ordering of `PeerId`s within a `HashMap`, so
|
||||
// we need to use this to prevent fragile reliance on peer ordering.
|
||||
fn network_actions_contains(actions: &[NetworkAction], action: &NetworkAction) -> bool {
|
||||
actions.iter().find(|&x| x == action).is_some()
|
||||
/// Assert that the given actions contain the given `action`.
|
||||
fn assert_network_actions_contains(actions: &[NetworkAction], action: &NetworkAction) {
|
||||
if !actions.iter().any(|x| x == action) {
|
||||
panic!("Could not find `{:?}` in `{:?}`", action, actions);
|
||||
}
|
||||
}
|
||||
|
||||
struct TestHarness {
|
||||
@@ -1035,23 +1045,85 @@ mod tests {
|
||||
view![hash_a]
|
||||
).encode();
|
||||
|
||||
assert!(network_actions_contains(
|
||||
assert_network_actions_contains(
|
||||
&actions,
|
||||
&NetworkAction::WriteNotification(
|
||||
peer_a,
|
||||
PeerSet::Validation,
|
||||
wire_message.clone(),
|
||||
),
|
||||
));
|
||||
);
|
||||
|
||||
assert!(network_actions_contains(
|
||||
assert_network_actions_contains(
|
||||
&actions,
|
||||
&NetworkAction::WriteNotification(
|
||||
peer_b,
|
||||
PeerSet::Validation,
|
||||
wire_message.clone(),
|
||||
),
|
||||
));
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn do_not_send_view_update_when_only_finalized_block_changed() {
|
||||
test_harness(|test_harness| async move {
|
||||
let TestHarness { mut network_handle, mut virtual_overseer } = test_harness;
|
||||
|
||||
let peer_a = PeerId::random();
|
||||
let peer_b = PeerId::random();
|
||||
|
||||
network_handle.connect_peer(
|
||||
peer_a.clone(),
|
||||
PeerSet::Validation,
|
||||
ObservedRole::Full,
|
||||
).await;
|
||||
network_handle.connect_peer(
|
||||
peer_b.clone(),
|
||||
PeerSet::Validation,
|
||||
ObservedRole::Full,
|
||||
).await;
|
||||
|
||||
let hash_a = Hash::repeat_byte(1);
|
||||
|
||||
virtual_overseer.send(FromOverseer::Signal(OverseerSignal::BlockFinalized(Hash::random(), 5))).await;
|
||||
|
||||
// Send some empty active leaves update
|
||||
//
|
||||
// This should not trigger a view update to our peers.
|
||||
virtual_overseer.send(
|
||||
FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::default()))
|
||||
).await;
|
||||
|
||||
// This should trigger the view update to our peers.
|
||||
virtual_overseer.send(
|
||||
FromOverseer::Signal(OverseerSignal::ActiveLeaves(
|
||||
ActiveLeavesUpdate::start_work(hash_a, Arc::new(JaegerSpan::Disabled)),
|
||||
))
|
||||
).await;
|
||||
|
||||
let actions = network_handle.next_network_actions(2).await;
|
||||
let wire_message = WireMessage::<protocol_v1::ValidationProtocol>::ViewUpdate(
|
||||
View { heads: vec![hash_a], finalized_number: 5 }
|
||||
).encode();
|
||||
|
||||
assert_network_actions_contains(
|
||||
&actions,
|
||||
&NetworkAction::WriteNotification(
|
||||
peer_a,
|
||||
PeerSet::Validation,
|
||||
wire_message.clone(),
|
||||
),
|
||||
);
|
||||
|
||||
assert_network_actions_contains(
|
||||
&actions,
|
||||
&NetworkAction::WriteNotification(
|
||||
peer_b,
|
||||
PeerSet::Validation,
|
||||
wire_message.clone(),
|
||||
),
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1225,14 +1297,14 @@ mod tests {
|
||||
view![hash_a]
|
||||
).encode();
|
||||
|
||||
assert!(network_actions_contains(
|
||||
assert_network_actions_contains(
|
||||
&actions,
|
||||
&NetworkAction::WriteNotification(
|
||||
peer.clone(),
|
||||
PeerSet::Collation,
|
||||
wire_message.clone(),
|
||||
),
|
||||
));
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1292,13 +1364,13 @@ mod tests {
|
||||
).await;
|
||||
|
||||
let actions = network_handle.next_network_actions(1).await;
|
||||
assert!(network_actions_contains(
|
||||
assert_network_actions_contains(
|
||||
&actions,
|
||||
&NetworkAction::ReputationChange(
|
||||
peer_a.clone(),
|
||||
UNCONNECTED_PEERSET_COST,
|
||||
),
|
||||
));
|
||||
);
|
||||
|
||||
// peer B has the message relayed.
|
||||
|
||||
@@ -1402,7 +1474,6 @@ mod tests {
|
||||
|
||||
let hash_a = Hash::repeat_byte(1);
|
||||
let hash_b = Hash::repeat_byte(2);
|
||||
let hash_c = Hash::repeat_byte(3);
|
||||
|
||||
virtual_overseer.send(
|
||||
FromOverseer::Signal(OverseerSignal::BlockFinalized(hash_a, 1))
|
||||
@@ -1421,39 +1492,14 @@ mod tests {
|
||||
}
|
||||
).encode();
|
||||
|
||||
assert!(network_actions_contains(
|
||||
assert_network_actions_contains(
|
||||
&actions,
|
||||
&NetworkAction::WriteNotification(
|
||||
peer_a.clone(),
|
||||
PeerSet::Validation,
|
||||
wire_message.clone(),
|
||||
),
|
||||
));
|
||||
|
||||
// view updates are issued even when `ActiveLeavesUpdate` is empty
|
||||
virtual_overseer.send(
|
||||
FromOverseer::Signal(OverseerSignal::BlockFinalized(hash_c, 3))
|
||||
).await;
|
||||
virtual_overseer.send(
|
||||
FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::default()))
|
||||
).await;
|
||||
|
||||
let actions = network_handle.next_network_actions(1).await;
|
||||
let wire_message = WireMessage::<protocol_v1::ValidationProtocol>::ViewUpdate(
|
||||
View {
|
||||
heads: vec![hash_b],
|
||||
finalized_number: 3,
|
||||
}
|
||||
).encode();
|
||||
|
||||
assert!(network_actions_contains(
|
||||
&actions,
|
||||
&NetworkAction::WriteNotification(
|
||||
peer_a,
|
||||
PeerSet::Validation,
|
||||
wire_message.clone(),
|
||||
),
|
||||
));
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -137,7 +137,7 @@ enum ToOverseer {
|
||||
/// This structure exists solely for the purposes of decoupling
|
||||
/// `Overseer` code from the client code and the necessity to call
|
||||
/// `HeaderBackend::block_number_from_id()`.
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct BlockInfo {
|
||||
/// hash of the block.
|
||||
pub hash: Hash,
|
||||
@@ -1514,7 +1514,9 @@ where
|
||||
update.activated.push((hash, span));
|
||||
}
|
||||
|
||||
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
|
||||
if !update.is_empty() {
|
||||
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
|
||||
}
|
||||
|
||||
loop {
|
||||
select! {
|
||||
@@ -1620,10 +1622,14 @@ where
|
||||
self.on_head_deactivated(deactivated)
|
||||
}
|
||||
|
||||
|
||||
self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number)).await?;
|
||||
// broadcast `ActiveLeavesUpdate` even if empty to issue view updates
|
||||
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
|
||||
|
||||
// If there are no leaves being deactivated, we don't need to send an update.
|
||||
//
|
||||
// Our peers will be informed about our finalized block the next time we activating/deactivating some leaf.
|
||||
if !update.is_empty() {
|
||||
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1898,9 +1904,9 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
struct TestSubsystem4;
|
||||
struct ReturnOnStart;
|
||||
|
||||
impl<C> Subsystem<C> for TestSubsystem4
|
||||
impl<C> Subsystem<C> for ReturnOnStart
|
||||
where C: SubsystemContext<Message=CandidateBackingMessage>
|
||||
{
|
||||
fn start(self, mut _ctx: C) -> SpawnedSubsystem {
|
||||
@@ -2043,29 +2049,22 @@ mod tests {
|
||||
|
||||
// Spawn a subsystem that immediately exits.
|
||||
//
|
||||
// Should immediately conclude the overseer itself with an error.
|
||||
// Should immediately conclude the overseer itself.
|
||||
#[test]
|
||||
fn overseer_panics_on_subsystem_exit() {
|
||||
fn overseer_ends_on_subsystem_exit() {
|
||||
let spawner = sp_core::testing::TaskExecutor::new();
|
||||
|
||||
executor::block_on(async move {
|
||||
let (s1_tx, _) = mpsc::channel(64);
|
||||
let all_subsystems = AllSubsystems::<()>::dummy()
|
||||
.replace_candidate_validation(TestSubsystem1(s1_tx))
|
||||
.replace_candidate_backing(TestSubsystem4);
|
||||
.replace_candidate_backing(ReturnOnStart);
|
||||
let (overseer, _handle) = Overseer::new(
|
||||
vec![],
|
||||
all_subsystems,
|
||||
None,
|
||||
spawner,
|
||||
).unwrap();
|
||||
let overseer_fut = overseer.run().fuse();
|
||||
pin_mut!(overseer_fut);
|
||||
|
||||
select! {
|
||||
res = overseer_fut => assert!(res.is_err()),
|
||||
complete => (),
|
||||
}
|
||||
overseer.run().await.unwrap();
|
||||
})
|
||||
}
|
||||
|
||||
@@ -2309,9 +2308,8 @@ mod tests {
|
||||
complete => break,
|
||||
}
|
||||
|
||||
if ss5_results.len() == expected_heartbeats.len() &&
|
||||
ss6_results.len() == expected_heartbeats.len() {
|
||||
handler.stop().await;
|
||||
if ss5_results.len() == expected_heartbeats.len() && ss6_results.len() == expected_heartbeats.len() {
|
||||
handler.stop().await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2327,6 +2325,79 @@ mod tests {
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn do_not_send_empty_leaves_update_on_block_finalization() {
|
||||
let spawner = sp_core::testing::TaskExecutor::new();
|
||||
|
||||
executor::block_on(async move {
|
||||
let imported_block = BlockInfo {
|
||||
hash: Hash::random(),
|
||||
parent_hash: Hash::random(),
|
||||
number: 1,
|
||||
};
|
||||
|
||||
let finalized_block = BlockInfo {
|
||||
hash: Hash::random(),
|
||||
parent_hash: Hash::random(),
|
||||
number: 1,
|
||||
};
|
||||
|
||||
let (tx_5, mut rx_5) = mpsc::channel(64);
|
||||
|
||||
let all_subsystems = AllSubsystems::<()>::dummy()
|
||||
.replace_candidate_backing(TestSubsystem6(tx_5));
|
||||
|
||||
let (overseer, mut handler) = Overseer::new(
|
||||
Vec::new(),
|
||||
all_subsystems,
|
||||
None,
|
||||
spawner,
|
||||
).unwrap();
|
||||
|
||||
let overseer_fut = overseer.run().fuse();
|
||||
pin_mut!(overseer_fut);
|
||||
|
||||
let mut ss5_results = Vec::new();
|
||||
|
||||
handler.block_finalized(finalized_block.clone()).await;
|
||||
handler.block_imported(imported_block.clone()).await;
|
||||
|
||||
let expected_heartbeats = vec![
|
||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
||||
activated: [
|
||||
(imported_block.hash, Arc::new(JaegerSpan::Disabled)),
|
||||
].as_ref().into(),
|
||||
..Default::default()
|
||||
}),
|
||||
OverseerSignal::BlockFinalized(finalized_block.hash, 1),
|
||||
];
|
||||
|
||||
loop {
|
||||
select! {
|
||||
res = overseer_fut => {
|
||||
assert!(res.is_ok());
|
||||
break;
|
||||
},
|
||||
res = rx_5.next() => {
|
||||
if let Some(res) = dbg!(res) {
|
||||
ss5_results.push(res);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if ss5_results.len() == expected_heartbeats.len() {
|
||||
handler.stop().await;
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(ss5_results.len(), expected_heartbeats.len());
|
||||
|
||||
for expected in expected_heartbeats {
|
||||
assert!(ss5_results.contains(&expected));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct CounterSubsystem {
|
||||
stop_signals_received: Arc<atomic::AtomicUsize>,
|
||||
@@ -2542,7 +2613,7 @@ mod tests {
|
||||
|
||||
assert_eq!(stop_signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS);
|
||||
// x2 because of broadcast_signal on startup
|
||||
assert_eq!(signals_received.load(atomic::Ordering::SeqCst), 2 * NUM_SUBSYSTEMS);
|
||||
assert_eq!(signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS);
|
||||
// -1 for BitfieldSigning
|
||||
assert_eq!(msgs_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS - 1);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user