mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-31 09:51:02 +00:00
distribution: handle sqrt peer view updates (#2871)
* distribution: handle sqrt peer view updates * someone please put rustc into my brain * guide updates
This commit is contained in:
@@ -442,20 +442,15 @@ impl State {
|
|||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
view: View,
|
view: View,
|
||||||
) {
|
) {
|
||||||
|
let lucky = util::gen_ratio_sqrt_subset(self.peer_views.len(), util::MIN_GOSSIP_PEERS);
|
||||||
tracing::trace!(
|
tracing::trace!(
|
||||||
target: LOG_TARGET,
|
target: LOG_TARGET,
|
||||||
?view,
|
?view,
|
||||||
|
?lucky,
|
||||||
"Peer view change",
|
"Peer view change",
|
||||||
);
|
);
|
||||||
Self::unify_with_peer(
|
|
||||||
ctx,
|
|
||||||
metrics,
|
|
||||||
&mut self.blocks,
|
|
||||||
peer_id.clone(),
|
|
||||||
view.clone(),
|
|
||||||
).await;
|
|
||||||
let finalized_number = view.finalized_number;
|
let finalized_number = view.finalized_number;
|
||||||
let old_view = self.peer_views.insert(peer_id.clone(), view);
|
let old_view = self.peer_views.insert(peer_id.clone(), view.clone());
|
||||||
let old_finalized_number = old_view.map(|v| v.finalized_number).unwrap_or(0);
|
let old_finalized_number = old_view.map(|v| v.finalized_number).unwrap_or(0);
|
||||||
|
|
||||||
// we want to prune every block known_by peer up to (including) view.finalized_number
|
// we want to prune every block known_by peer up to (including) view.finalized_number
|
||||||
@@ -465,7 +460,7 @@ impl State {
|
|||||||
// but we need to make sure the range is not empty, otherwise it will panic
|
// but we need to make sure the range is not empty, otherwise it will panic
|
||||||
// it shouldn't be, we make sure of this in the network bridge
|
// it shouldn't be, we make sure of this in the network bridge
|
||||||
let range = old_finalized_number..=finalized_number;
|
let range = old_finalized_number..=finalized_number;
|
||||||
if !range.is_empty() {
|
if !range.is_empty() && !blocks.is_empty() {
|
||||||
self.blocks_by_number
|
self.blocks_by_number
|
||||||
.range(range)
|
.range(range)
|
||||||
.map(|(_number, hashes)| hashes)
|
.map(|(_number, hashes)| hashes)
|
||||||
@@ -476,6 +471,18 @@ impl State {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !lucky {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Self::unify_with_peer(
|
||||||
|
ctx,
|
||||||
|
metrics,
|
||||||
|
&mut self.blocks,
|
||||||
|
peer_id.clone(),
|
||||||
|
view,
|
||||||
|
).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_block_finalized(
|
fn handle_block_finalized(
|
||||||
|
|||||||
@@ -599,7 +599,15 @@ where
|
|||||||
Context: SubsystemContext<Message = BitfieldDistributionMessage>,
|
Context: SubsystemContext<Message = BitfieldDistributionMessage>,
|
||||||
{
|
{
|
||||||
let added = state.peer_views.entry(origin.clone()).or_default().replace_difference(view).cloned().collect::<Vec<_>>();
|
let added = state.peer_views.entry(origin.clone()).or_default().replace_difference(view).cloned().collect::<Vec<_>>();
|
||||||
|
let lucky = util::gen_ratio_sqrt_subset(state.peer_views.len(), util::MIN_GOSSIP_PEERS);
|
||||||
|
if !lucky {
|
||||||
|
tracing::trace!(
|
||||||
|
target: LOG_TARGET,
|
||||||
|
?origin,
|
||||||
|
"Peer view change is ignored",
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
// Send all messages we've seen before and the peer is now interested
|
// Send all messages we've seen before and the peer is now interested
|
||||||
// in to that peer.
|
// in to that peer.
|
||||||
|
|
||||||
|
|||||||
@@ -216,12 +216,26 @@ pub fn choose_random_sqrt_subset<T>(mut v: Vec<T>, min: usize) -> Vec<T> {
|
|||||||
let mut rng = rand::thread_rng();
|
let mut rng = rand::thread_rng();
|
||||||
v.shuffle(&mut rng);
|
v.shuffle(&mut rng);
|
||||||
|
|
||||||
let len_sqrt = (v.len() as f64).sqrt() as usize;
|
let len = max_of_min_and_sqrt_len(v.len(), min);
|
||||||
let len = std::cmp::max(min, len_sqrt);
|
|
||||||
v.truncate(len);
|
v.truncate(len);
|
||||||
v
|
v
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns bool with a probability of `max(len.sqrt(), min) / len`
|
||||||
|
/// being true.
|
||||||
|
pub fn gen_ratio_sqrt_subset(len: usize, min: usize) -> bool {
|
||||||
|
use rand::Rng as _;
|
||||||
|
let mut rng = rand::thread_rng();
|
||||||
|
let threshold = max_of_min_and_sqrt_len(len, min);
|
||||||
|
let n = rng.gen_range(0..len);
|
||||||
|
n < threshold
|
||||||
|
}
|
||||||
|
|
||||||
|
fn max_of_min_and_sqrt_len(len: usize, min: usize) -> usize {
|
||||||
|
let len_sqrt = (len as f64).sqrt() as usize;
|
||||||
|
std::cmp::max(min, len_sqrt)
|
||||||
|
}
|
||||||
|
|
||||||
/// Local validator information
|
/// Local validator information
|
||||||
///
|
///
|
||||||
/// It can be created if the local node is a validator in the context of a particular
|
/// It can be created if the local node is a validator in the context of a particular
|
||||||
|
|||||||
@@ -9,3 +9,7 @@ from any validator in that set.
|
|||||||
Gossiping subsystems will be notified when a new peer connects or disconnects by network bridge.
|
Gossiping subsystems will be notified when a new peer connects or disconnects by network bridge.
|
||||||
It is their responsibility to limit the amount of outgoing gossip messages.
|
It is their responsibility to limit the amount of outgoing gossip messages.
|
||||||
At the moment we enforce a cap of `max(sqrt(peers.len()), 25)` message recipients at a time in each gossiping subsystem.
|
At the moment we enforce a cap of `max(sqrt(peers.len()), 25)` message recipients at a time in each gossiping subsystem.
|
||||||
|
|
||||||
|
We also flip a coin with the same probability when handling peer view updates in the distribution subsystems.
|
||||||
|
Over time the probability of not handling a peer view update converges to zero, so it shouldn't be cause much trouble.
|
||||||
|
This should be considered as a temporary measure until we implement a more robust solution for gossiping.
|
||||||
|
|||||||
Reference in New Issue
Block a user