Peerset cleanup (#6078)

* Move methods from Peerset to peers structs

* Remove priority_only from peersstate

* Refactor PSM

* Don't test private fields

* Update sc_network

* Remove wrong comment

* Also fix small stupidity when setting reserved_only

* Put back priority_group

* Restore priority groups as before

* Apply suggestions from code review

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>

* Do the reserved only change

* Update client/peerset/src/lib.rs

Co-authored-by: Arkadiy Paronyan <arkady.paronyan@gmail.com>

* Use HashSet::difference

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>
Co-authored-by: Arkadiy Paronyan <arkady.paronyan@gmail.com>
This commit is contained in:
Pierre Krieger
2020-05-27 15:53:07 +02:00
committed by GitHub
parent 8279ba96df
commit 4b3f134b4a
3 changed files with 286 additions and 382 deletions
+140 -49
View File
@@ -182,9 +182,13 @@ pub struct PeersetConfig {
/// errors.
#[derive(Debug)]
pub struct Peerset {
/// Underlying data structure for the nodes's states.
data: peersstate::PeersState,
/// If true, we only accept reserved nodes.
reserved_only: bool,
/// Lists of nodes that don't occupy slots and that we should try to always be connected to.
/// Is kept in sync with the list of reserved nodes in [`Peerset::data`].
priority_groups: HashMap<String, HashSet<PeerId>>,
/// Receiver for messages from the `PeersetHandle` and from `tx`.
rx: TracingUnboundedReceiver<Action>,
/// Sending side of `rx`.
@@ -209,17 +213,18 @@ impl Peerset {
let now = Instant::now();
let mut peerset = Peerset {
data: peersstate::PeersState::new(config.in_peers, config.out_peers, config.reserved_only),
data: peersstate::PeersState::new(config.in_peers, config.out_peers),
tx,
rx,
reserved_only: config.reserved_only,
priority_groups: config.priority_groups.clone().into_iter().collect(),
message_queue: VecDeque::new(),
created: now,
latest_time_update: now,
};
for (group, nodes) in config.priority_groups {
peerset.data.set_priority_group(&group, nodes);
for node in config.priority_groups.into_iter().flat_map(|(_, l)| l) {
peerset.data.add_no_slot_node(node);
}
for peer_id in config.bootnodes {
@@ -235,61 +240,92 @@ impl Peerset {
}
fn on_add_reserved_peer(&mut self, peer_id: PeerId) {
let mut reserved = self.data.get_priority_group(RESERVED_NODES).unwrap_or_default();
reserved.insert(peer_id);
self.data.set_priority_group(RESERVED_NODES, reserved);
self.alloc_slots();
self.on_add_to_priority_group(RESERVED_NODES, peer_id);
}
fn on_remove_reserved_peer(&mut self, peer_id: PeerId) {
let mut reserved = self.data.get_priority_group(RESERVED_NODES).unwrap_or_default();
reserved.remove(&peer_id);
self.data.set_priority_group(RESERVED_NODES, reserved);
match self.data.peer(&peer_id) {
peersstate::Peer::Connected(peer) => {
if self.reserved_only {
peer.disconnect();
self.message_queue.push_back(Message::Drop(peer_id));
}
}
peersstate::Peer::NotConnected(_) => {},
peersstate::Peer::Unknown(_) => {},
}
self.on_remove_from_priority_group(RESERVED_NODES, peer_id);
}
fn on_set_reserved_only(&mut self, reserved_only: bool) {
self.reserved_only = reserved_only;
self.data.set_priority_only(reserved_only);
if self.reserved_only {
// Disconnect non-reserved nodes.
let reserved = self.data.get_priority_group(RESERVED_NODES).unwrap_or_default();
// Disconnect all the nodes that aren't reserved.
for peer_id in self.data.connected_peers().cloned().collect::<Vec<_>>().into_iter() {
if self.priority_groups.get(RESERVED_NODES).map_or(false, |g| g.contains(&peer_id)) {
continue;
}
let peer = self.data.peer(&peer_id).into_connected()
.expect("We are enumerating connected peers, therefore the peer is connected; qed");
if !reserved.contains(&peer_id) {
peer.disconnect();
self.message_queue.push_back(Message::Drop(peer_id));
}
peer.disconnect();
self.message_queue.push_back(Message::Drop(peer_id));
}
} else {
self.alloc_slots();
}
}
fn on_set_priority_group(&mut self, group_id: &str, peers: HashSet<PeerId>) {
self.data.set_priority_group(group_id, peers);
self.alloc_slots();
// Determine the difference between the current group and the new list.
let (to_insert, to_remove) = {
let current_group = self.priority_groups.entry(group_id.to_owned()).or_default();
let to_insert = peers.difference(current_group)
.cloned().collect::<Vec<_>>();
let to_remove = current_group.difference(&peers)
.cloned().collect::<Vec<_>>();
(to_insert, to_remove)
};
// Enumerate elements in `peers` not in `current_group`.
for peer_id in &to_insert {
// We don't call `on_add_to_priority_group` here in order to avoid calling
// `alloc_slots` all the time.
self.priority_groups.entry(group_id.to_owned()).or_default().insert(peer_id.clone());
self.data.add_no_slot_node(peer_id.clone());
}
// Enumerate elements in `current_group` not in `peers`.
for peer in to_remove {
self.on_remove_from_priority_group(group_id, peer);
}
if !to_insert.is_empty() {
self.alloc_slots();
}
}
fn on_add_to_priority_group(&mut self, group_id: &str, peer_id: PeerId) {
self.data.add_to_priority_group(group_id, peer_id);
self.priority_groups.entry(group_id.to_owned()).or_default().insert(peer_id.clone());
self.data.add_no_slot_node(peer_id);
self.alloc_slots();
}
fn on_remove_from_priority_group(&mut self, group_id: &str, peer_id: PeerId) {
self.data.remove_from_priority_group(group_id, &peer_id);
self.alloc_slots();
if let Some(priority_group) = self.priority_groups.get_mut(group_id) {
if !priority_group.remove(&peer_id) {
// `PeerId` wasn't in the group in the first place.
return;
}
} else {
// Group doesn't exist, so the `PeerId` can't be in it.
return;
}
// If that `PeerId` isn't in any other group, then it is no longer no-slot-occupying.
if !self.priority_groups.values().any(|l| l.contains(&peer_id)) {
self.data.remove_no_slot_node(&peer_id);
}
// Disconnect the peer if necessary.
if group_id != RESERVED_NODES && self.reserved_only {
if let peersstate::Peer::Connected(peer) = self.data.peer(&peer_id) {
peer.disconnect();
self.message_queue.push_back(Message::Drop(peer_id));
}
}
}
fn on_report_peer(&mut self, peer_id: PeerId, change: ReputationChange) {
@@ -376,25 +412,82 @@ impl Peerset {
fn alloc_slots(&mut self) {
self.update_time();
// Try to grab the next node to attempt to connect to.
while let Some(next) = {
if self.reserved_only {
self.data.priority_not_connected_peer_from_group(RESERVED_NODES)
} else {
self.data.priority_not_connected_peer()
}
} {
// Try to connect to all the reserved nodes that we are not connected to.
loop {
let next = {
let data = &mut self.data;
self.priority_groups
.get(RESERVED_NODES)
.into_iter()
.flatten()
.filter(move |n| {
data.peer(n).into_connected().is_none()
})
.next()
.cloned()
};
let next = match next {
Some(n) => n,
None => break,
};
let next = match self.data.peer(&next) {
peersstate::Peer::Unknown(n) => n.discover(),
peersstate::Peer::NotConnected(n) => n,
peersstate::Peer::Connected(_) => {
debug_assert!(false, "State inconsistency: not connected state");
break;
}
};
match next.try_outgoing() {
Ok(conn) => self.message_queue.push_back(Message::Connect(conn.into_peer_id())),
Err(_) => break, // No more slots available.
}
}
loop {
if self.reserved_only {
break
}
// Nothing more to do if we're in reserved mode.
if self.reserved_only {
return;
}
// Try to connect to all the nodes in priority groups and that we are not connected to.
loop {
let next = {
let data = &mut self.data;
self.priority_groups
.values()
.flatten()
.filter(move |n| {
data.peer(n).into_connected().is_none()
})
.next()
.cloned()
};
let next = match next {
Some(n) => n,
None => break,
};
let next = match self.data.peer(&next) {
peersstate::Peer::Unknown(n) => n.discover(),
peersstate::Peer::NotConnected(n) => n,
peersstate::Peer::Connected(_) => {
debug_assert!(false, "State inconsistency: not connected state");
break;
}
};
match next.try_outgoing() {
Ok(conn) => self.message_queue.push_back(Message::Connect(conn.into_peer_id())),
Err(_) => break, // No more slots available.
}
}
// Now, we try to connect to non-priority nodes.
loop {
// Try to grab the next node to attempt to connect to.
let next = match self.data.highest_not_connected_peer() {
Some(p) => p,
@@ -529,9 +622,9 @@ impl Peerset {
self.data.peers().len()
}
/// Returns priority group by id.
pub fn get_priority_group(&self, group_id: &str) -> Option<HashSet<PeerId>> {
self.data.get_priority_group(group_id)
/// Returns the content of a priority group.
pub fn priority_group(&self, group_id: &str) -> Option<impl ExactSizeIterator<Item = &PeerId>> {
self.priority_groups.get(group_id).map(|l| l.iter())
}
}
@@ -583,7 +676,6 @@ mod tests {
assert_eq!(message, expected_message);
peerset = p;
}
assert!(peerset.message_queue.is_empty(), peerset.message_queue);
peerset
}
@@ -713,4 +805,3 @@ mod tests {
futures::executor::block_on(fut);
}
}