Clean up sc-peerset (#9806)

* Clean up sc-peerset

* cargo +nightly fmt --all

* Nit

* Nit

* .

* Nit

* .

* Apply suggestions from code review

* .

* Update client/peerset/src/peersstate.rs

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
This commit is contained in:
Liu-Cheng Xu
2021-10-01 17:43:55 +08:00
committed by GitHub
parent f8ce186496
commit 00973f5b62
6 changed files with 62 additions and 72 deletions
+5 -5
View File
@@ -155,14 +155,14 @@ pub enum Role {
}
impl Role {
/// True for `Role::Authority`
/// True for [`Role::Authority`].
pub fn is_authority(&self) -> bool {
matches!(self, Role::Authority { .. })
matches!(self, Self::Authority { .. })
}
/// True for `Role::Light`
/// True for [`Role::Light`].
pub fn is_light(&self) -> bool {
matches!(self, Role::Light { .. })
matches!(self, Self::Light { .. })
}
}
@@ -329,7 +329,7 @@ impl FromStr for MultiaddrWithPeerId {
fn from_str(s: &str) -> Result<Self, Self::Err> {
let (peer_id, multiaddr) = parse_str_addr(s)?;
Ok(MultiaddrWithPeerId { peer_id, multiaddr })
Ok(Self { peer_id, multiaddr })
}
}
@@ -417,7 +417,7 @@ impl Notifications {
/// Returns true if we have an open substream to the given peer.
pub fn is_open(&self, peer_id: &PeerId, set_id: sc_peerset::SetId) -> bool {
self.peers.get(&(peer_id.clone(), set_id)).map(|p| p.is_open()).unwrap_or(false)
self.peers.get(&(*peer_id, set_id)).map(|p| p.is_open()).unwrap_or(false)
}
/// Disconnects the given peer if we are connected to it.
@@ -1777,7 +1777,7 @@ impl NetworkBehaviour for Notifications {
"Handler({}, {:?}) => CloseResult({:?})",
source, connection, set_id);
match self.peers.get_mut(&(source.clone(), set_id)) {
match self.peers.get_mut(&(source, set_id)) {
// Move the connection from `Closing` to `Closed`.
Some(PeerState::Incoming { connections, .. }) |
Some(PeerState::DisabledPendingEnable { connections, .. }) |
+7 -10
View File
@@ -92,7 +92,7 @@ struct Metrics {
impl Metrics {
fn register(r: &Registry) -> Result<Self, PrometheusError> {
Ok(Metrics {
Ok(Self {
propagated_transactions: register(
Counter::new(
"sync_propagated_transactions",
@@ -133,7 +133,7 @@ pub struct TransactionsHandlerPrototype {
impl TransactionsHandlerPrototype {
/// Create a new instance.
pub fn new(protocol_id: ProtocolId) -> Self {
TransactionsHandlerPrototype {
Self {
protocol_name: Cow::from({
let mut proto = String::new();
proto.push_str("/");
@@ -401,7 +401,7 @@ impl<B: BlockT + 'static, H: ExHashT> TransactionsHandler<B, H> {
let hash = self.transaction_pool.hash_of(&t);
peer.known_transactions.insert(hash.clone());
self.service.report_peer(who.clone(), rep::ANY_TRANSACTION);
self.service.report_peer(who, rep::ANY_TRANSACTION);
match self.pending_transactions_peers.entry(hash.clone()) {
Entry::Vacant(entry) => {
@@ -409,10 +409,10 @@ impl<B: BlockT + 'static, H: ExHashT> TransactionsHandler<B, H> {
validation: self.transaction_pool.import(t),
tx_hash: hash,
});
entry.insert(vec![who.clone()]);
entry.insert(vec![who]);
},
Entry::Occupied(mut entry) => {
entry.get_mut().push(who.clone());
entry.get_mut().push(who);
},
}
}
@@ -468,11 +468,8 @@ impl<B: BlockT + 'static, H: ExHashT> TransactionsHandler<B, H> {
propagated_to.entry(hash).or_default().push(who.to_base58());
}
trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who);
self.service.write_notification(
who.clone(),
self.protocol_name.clone(),
to_send.encode(),
);
self.service
.write_notification(*who, self.protocol_name.clone(), to_send.encode());
}
}
-1
View File
@@ -13,7 +13,6 @@ readme = "README.md"
[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
futures = "0.3.9"
libp2p = { version = "0.39.1", default-features = false }
+24 -25
View File
@@ -79,13 +79,13 @@ pub struct SetId(usize);
impl SetId {
pub const fn from(id: usize) -> Self {
SetId(id)
Self(id)
}
}
impl From<usize> for SetId {
fn from(id: usize) -> Self {
SetId(id)
Self(id)
}
}
@@ -107,12 +107,12 @@ pub struct ReputationChange {
impl ReputationChange {
/// New reputation change with given delta and reason.
pub const fn new(value: i32, reason: &'static str) -> ReputationChange {
ReputationChange { value, reason }
Self { value, reason }
}
/// New reputation change that forces minimum possible reputation.
pub const fn new_fatal(reason: &'static str) -> ReputationChange {
ReputationChange { value: i32::MIN, reason }
Self { value: i32::MIN, reason }
}
}
@@ -208,8 +208,8 @@ pub enum Message {
pub struct IncomingIndex(pub u64);
impl From<u64> for IncomingIndex {
fn from(val: u64) -> IncomingIndex {
IncomingIndex(val)
fn from(val: u64) -> Self {
Self(val)
}
}
@@ -274,7 +274,7 @@ pub struct Peerset {
impl Peerset {
/// Builds a new peerset from the given configuration.
pub fn from_config(config: PeersetConfig) -> (Peerset, PeersetHandle) {
pub fn from_config(config: PeersetConfig) -> (Self, PeersetHandle) {
let (tx, rx) = tracing_unbounded("mpsc_peerset_messages");
let handle = PeersetHandle { tx: tx.clone() };
@@ -282,7 +282,7 @@ impl Peerset {
let mut peerset = {
let now = Instant::now();
Peerset {
Self {
data: peersstate::PeersState::new(config.sets.iter().map(|set| {
peersstate::SetConfig { in_peers: set.in_peers, out_peers: set.out_peers }
})),
@@ -322,7 +322,7 @@ impl Peerset {
}
fn on_add_reserved_peer(&mut self, set_id: SetId, peer_id: PeerId) {
let newly_inserted = self.reserved_nodes[set_id.0].0.insert(peer_id.clone());
let newly_inserted = self.reserved_nodes[set_id.0].0.insert(peer_id);
if !newly_inserted {
return
}
@@ -422,8 +422,7 @@ impl Peerset {
match self.data.peer(set_id.0, &peer_id) {
peersstate::Peer::Connected(peer) => {
self.message_queue
.push_back(Message::Drop { set_id, peer_id: peer.peer_id().clone() });
self.message_queue.push_back(Message::Drop { set_id, peer_id: *peer.peer_id() });
peer.disconnect().forget_peer();
},
peersstate::Peer::NotConnected(peer) => {
@@ -819,8 +818,8 @@ mod tests {
};
let (peerset, handle) = Peerset::from_config(config);
handle.add_reserved_peer(SetId::from(0), reserved_peer.clone());
handle.add_reserved_peer(SetId::from(0), reserved_peer2.clone());
handle.add_reserved_peer(SetId::from(0), reserved_peer);
handle.add_reserved_peer(SetId::from(0), reserved_peer2);
assert_messages(
peerset,
@@ -845,22 +844,22 @@ mod tests {
sets: vec![SetConfig {
in_peers: 2,
out_peers: 1,
bootnodes: vec![bootnode.clone()],
bootnodes: vec![bootnode],
reserved_nodes: Default::default(),
reserved_only: false,
}],
};
let (mut peerset, _handle) = Peerset::from_config(config);
peerset.incoming(SetId::from(0), incoming.clone(), ii);
peerset.incoming(SetId::from(0), incoming.clone(), ii4);
peerset.incoming(SetId::from(0), incoming2.clone(), ii2);
peerset.incoming(SetId::from(0), incoming3.clone(), ii3);
peerset.incoming(SetId::from(0), incoming, ii);
peerset.incoming(SetId::from(0), incoming, ii4);
peerset.incoming(SetId::from(0), incoming2, ii2);
peerset.incoming(SetId::from(0), incoming3, ii3);
assert_messages(
peerset,
vec![
Message::Connect { set_id: SetId::from(0), peer_id: bootnode.clone() },
Message::Connect { set_id: SetId::from(0), peer_id: bootnode },
Message::Accept(ii),
Message::Accept(ii2),
Message::Reject(ii3),
@@ -883,7 +882,7 @@ mod tests {
};
let (mut peerset, _) = Peerset::from_config(config);
peerset.incoming(SetId::from(0), incoming.clone(), ii);
peerset.incoming(SetId::from(0), incoming, ii);
assert_messages(peerset, vec![Message::Reject(ii)]);
}
@@ -897,15 +896,15 @@ mod tests {
sets: vec![SetConfig {
in_peers: 0,
out_peers: 2,
bootnodes: vec![bootnode.clone()],
bootnodes: vec![bootnode],
reserved_nodes: Default::default(),
reserved_only: false,
}],
};
let (mut peerset, _handle) = Peerset::from_config(config);
peerset.add_to_peers_set(SetId::from(0), discovered.clone());
peerset.add_to_peers_set(SetId::from(0), discovered.clone());
peerset.add_to_peers_set(SetId::from(0), discovered);
peerset.add_to_peers_set(SetId::from(0), discovered);
peerset.add_to_peers_set(SetId::from(0), discovered2);
assert_messages(
@@ -931,7 +930,7 @@ mod tests {
// We ban a node by setting its reputation under the threshold.
let peer_id = PeerId::random();
handle.report_peer(peer_id.clone(), ReputationChange::new(BANNED_THRESHOLD - 1, ""));
handle.report_peer(peer_id, ReputationChange::new(BANNED_THRESHOLD - 1, ""));
let fut = futures::future::poll_fn(move |cx| {
// We need one polling for the message to be processed.
@@ -974,7 +973,7 @@ mod tests {
// We ban a node by setting its reputation under the threshold.
let peer_id = PeerId::random();
handle.report_peer(peer_id.clone(), ReputationChange::new(BANNED_THRESHOLD - 1, ""));
handle.report_peer(peer_id, ReputationChange::new(BANNED_THRESHOLD - 1, ""));
let fut = futures::future::poll_fn(move |cx| {
// We need one polling for the message to be processed.
+24 -29
View File
@@ -105,8 +105,8 @@ struct Node {
}
impl Node {
fn new(num_sets: usize) -> Node {
Node { sets: (0..num_sets).map(|_| MembershipState::NotMember).collect(), reputation: 0 }
fn new(num_sets: usize) -> Self {
Self { sets: (0..num_sets).map(|_| MembershipState::NotMember).collect(), reputation: 0 }
}
}
@@ -128,21 +128,24 @@ enum MembershipState {
}
impl MembershipState {
/// Returns `true` for `In` and `Out`.
/// Returns `true` for [`MembershipState::In`] and [`MembershipState::Out`].
fn is_connected(self) -> bool {
match self {
MembershipState::NotMember => false,
MembershipState::In => true,
MembershipState::Out => true,
MembershipState::NotConnected { .. } => false,
Self::In | Self::Out => true,
Self::NotMember | Self::NotConnected { .. } => false,
}
}
/// Returns `true` for [`MembershipState::NotConnected`].
fn is_not_connected(self) -> bool {
matches!(self, Self::NotConnected { .. })
}
}
impl PeersState {
/// Builds a new empty `PeersState`.
/// Builds a new empty [`PeersState`].
pub fn new(sets: impl IntoIterator<Item = SetConfig>) -> Self {
PeersState {
Self {
nodes: HashMap::new(),
sets: sets
.into_iter()
@@ -242,12 +245,7 @@ impl PeersState {
let outcome = self
.nodes
.iter_mut()
.filter(|(_, Node { sets, .. })| match sets[set] {
MembershipState::NotMember => false,
MembershipState::In => false,
MembershipState::Out => false,
MembershipState::NotConnected { .. } => true,
})
.filter(|(_, Node { sets, .. })| sets[set].is_not_connected())
.fold(None::<(&PeerId, &mut Node)>, |mut cur_node, to_try| {
if let Some(cur_node) = cur_node.take() {
if cur_node.1.reputation >= to_try.1.reputation {
@@ -318,35 +316,32 @@ pub enum Peer<'a> {
}
impl<'a> Peer<'a> {
/// If we are the `Connected` variant, returns the inner `ConnectedPeer`. Returns `None`
/// If we are the `Connected` variant, returns the inner [`ConnectedPeer`]. Returns `None`
/// otherwise.
pub fn into_connected(self) -> Option<ConnectedPeer<'a>> {
match self {
Peer::Connected(peer) => Some(peer),
Peer::NotConnected(_) => None,
Peer::Unknown(_) => None,
Self::Connected(peer) => Some(peer),
Self::NotConnected(..) | Self::Unknown(..) => None,
}
}
/// If we are the `Unknown` variant, returns the inner `ConnectedPeer`. Returns `None`
/// If we are the `NotConnected` variant, returns the inner [`NotConnectedPeer`]. Returns `None`
/// otherwise.
#[cfg(test)] // Feel free to remove this if this function is needed outside of tests
pub fn into_not_connected(self) -> Option<NotConnectedPeer<'a>> {
match self {
Peer::Connected(_) => None,
Peer::NotConnected(peer) => Some(peer),
Peer::Unknown(_) => None,
Self::NotConnected(peer) => Some(peer),
Self::Connected(..) | Self::Unknown(..) => None,
}
}
/// If we are the `Unknown` variant, returns the inner `ConnectedPeer`. Returns `None`
/// If we are the `Unknown` variant, returns the inner [`UnknownPeer`]. Returns `None`
/// otherwise.
#[cfg(test)] // Feel free to remove this if this function is needed outside of tests
pub fn into_unknown(self) -> Option<UnknownPeer<'a>> {
match self {
Peer::Connected(_) => None,
Peer::NotConnected(_) => None,
Peer::Unknown(peer) => Some(peer),
Self::Unknown(peer) => Some(peer),
Self::Connected(..) | Self::NotConnected(..) => None,
}
}
}
@@ -473,7 +468,7 @@ impl<'a> NotConnectedPeer<'a> {
/// the slots are full, the node stays "not connected" and we return `Err`.
///
/// Non-slot-occupying nodes don't count towards the number of slots.
pub fn try_outgoing(self) -> Result<ConnectedPeer<'a>, NotConnectedPeer<'a>> {
pub fn try_outgoing(self) -> Result<ConnectedPeer<'a>, Self> {
let is_no_slot_occupy = self.state.sets[self.set].no_slot_nodes.contains(&*self.peer_id);
// Note that it is possible for num_out to be strictly superior to the max, in case we were
@@ -500,7 +495,7 @@ impl<'a> NotConnectedPeer<'a> {
/// the slots are full, the node stays "not connected" and we return `Err`.
///
/// Non-slot-occupying nodes don't count towards the number of slots.
pub fn try_accept_incoming(self) -> Result<ConnectedPeer<'a>, NotConnectedPeer<'a>> {
pub fn try_accept_incoming(self) -> Result<ConnectedPeer<'a>, Self> {
let is_no_slot_occupy = self.state.sets[self.set].no_slot_nodes.contains(&*self.peer_id);
// Note that it is possible for num_in to be strictly superior to the max, in case we were