Split Peerset into PeerStore & ProtocolControllers (#13611)

This commit is contained in:
Dmitry Markin
2023-05-23 14:49:02 +03:00
committed by GitHub
parent 194c9edd4a
commit 01107e9ca5
19 changed files with 2915 additions and 1729 deletions
@@ -25,7 +25,7 @@ use crate::{
use bytes::BytesMut;
use fnv::FnvHashMap;
use futures::prelude::*;
use futures::{channel::oneshot, prelude::*};
use libp2p::{
core::{ConnectedPoint, Endpoint, Multiaddr},
swarm::{
@@ -35,7 +35,7 @@ use libp2p::{
},
PeerId,
};
use log::{error, trace, warn};
use log::{debug, error, info, trace, warn};
use parking_lot::RwLock;
use rand::distributions::{Distribution as _, Uniform};
use sc_peerset::DropReason;
@@ -231,6 +231,9 @@ enum PeerState {
/// If `Some`, any dial attempts to this peer are delayed until the given `Instant`.
backoff_until: Option<Instant>,
/// Incoming index tracking this connection.
incoming_index: sc_peerset::IncomingIndex,
/// List of connections with this peer, and their state.
connections: SmallVec<[(ConnectionId, ConnectionState); crate::MAX_CONNECTIONS_PER_PEER]>,
},
@@ -493,7 +496,7 @@ impl Notifications {
// Incoming => Disabled.
// Ongoing opening requests from the remote are rejected.
PeerState::Incoming { mut connections, backoff_until } => {
PeerState::Incoming { mut connections, backoff_until, .. } => {
let inc = if let Some(inc) = self
.incoming
.iter_mut()
@@ -536,8 +539,12 @@ impl Notifications {
}
/// Returns the list of reserved peers.
pub fn reserved_peers(&self, set_id: sc_peerset::SetId) -> impl Iterator<Item = &PeerId> {
self.peerset.reserved_peers(set_id)
pub fn reserved_peers(
&self,
set_id: sc_peerset::SetId,
pending_response: oneshot::Sender<Vec<PeerId>>,
) {
self.peerset.reserved_peers(set_id, pending_response);
}
/// Returns the state of the peerset manager, for debugging purposes.
@@ -686,65 +693,34 @@ impl Notifications {
};
}
},
// Incoming => Enabled
PeerState::Incoming { mut connections, .. } => {
trace!(target: "sub-libp2p", "PSM => Connect({}, {:?}): Enabling connections.",
occ_entry.key().0, set_id);
if let Some(inc) = self
.incoming
.iter_mut()
.find(|i| i.peer_id == occ_entry.key().0 && i.set_id == set_id && i.alive)
{
inc.alive = false;
} else {
error!(
target: "sub-libp2p",
"State mismatch in libp2p: no entry in incoming for incoming peer",
)
}
debug_assert!(connections
.iter()
.any(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote)));
for (connec_id, connec_state) in connections
.iter_mut()
.filter(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote))
{
trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Open({:?})",
occ_entry.key(), *connec_id, set_id);
self.events.push_back(ToSwarm::NotifyHandler {
peer_id: occ_entry.key().0,
handler: NotifyHandler::One(*connec_id),
event: NotifsHandlerIn::Open { protocol_index: set_id.into() },
});
*connec_state = ConnectionState::Opening;
}
*occ_entry.into_mut() = PeerState::Enabled { connections };
// Incoming => Incoming
st @ PeerState::Incoming { .. } => {
debug!(
target: "sub-libp2p",
"PSM => Connect({}, {:?}): Ignoring obsolete connect, we are awaiting accept/reject.",
occ_entry.key().0, set_id
);
*occ_entry.into_mut() = st;
},
// Other states are kept as-is.
st @ PeerState::Enabled { .. } => {
warn!(target: "sub-libp2p",
debug!(target: "sub-libp2p",
"PSM => Connect({}, {:?}): Already connected.",
occ_entry.key().0, set_id);
*occ_entry.into_mut() = st;
debug_assert!(false);
},
st @ PeerState::DisabledPendingEnable { .. } => {
warn!(target: "sub-libp2p",
debug!(target: "sub-libp2p",
"PSM => Connect({}, {:?}): Already pending enabling.",
occ_entry.key().0, set_id);
*occ_entry.into_mut() = st;
debug_assert!(false);
},
st @ PeerState::Requested { .. } | st @ PeerState::PendingRequest { .. } => {
warn!(target: "sub-libp2p",
debug!(target: "sub-libp2p",
"PSM => Connect({}, {:?}): Duplicate request.",
occ_entry.key().0, set_id);
*occ_entry.into_mut() = st;
debug_assert!(false);
},
PeerState::Poisoned => {
@@ -847,10 +823,12 @@ impl Notifications {
// Invalid state transitions.
st @ PeerState::Incoming { .. } => {
error!(target: "sub-libp2p", "PSM => Drop({}, {:?}): Not enabled (Incoming).",
entry.key().0, set_id);
info!(
target: "sub-libp2p",
"PSM => Drop({}, {:?}): Ignoring obsolete disconnect, we are awaiting accept/reject.",
entry.key().0, set_id,
);
*entry.into_mut() = st;
debug_assert!(false);
},
PeerState::Poisoned => {
error!(target: "sub-libp2p", "State of {:?} is poisoned", entry.key());
@@ -895,7 +873,24 @@ impl Notifications {
match mem::replace(state, PeerState::Poisoned) {
// Incoming => Enabled
PeerState::Incoming { mut connections, .. } => {
PeerState::Incoming { mut connections, incoming_index, .. } => {
if index < incoming_index {
warn!(
target: "sub-libp2p",
"PSM => Accept({:?}, {}, {:?}): Ignoring obsolete incoming index, we are already awaiting {:?}.",
index, incoming.peer_id, incoming.set_id, incoming_index
);
return
} else if index > incoming_index {
error!(
target: "sub-libp2p",
"PSM => Accept({:?}, {}, {:?}): Ignoring incoming index from the future, we are awaiting {:?}.",
index, incoming.peer_id, incoming.set_id, incoming_index
);
debug_assert!(false);
return
}
trace!(target: "sub-libp2p", "PSM => Accept({:?}, {}, {:?}): Enabling connections.",
index, incoming.peer_id, incoming.set_id);
@@ -955,7 +950,24 @@ impl Notifications {
match mem::replace(state, PeerState::Poisoned) {
// Incoming => Disabled
PeerState::Incoming { mut connections, backoff_until } => {
PeerState::Incoming { mut connections, backoff_until, incoming_index } => {
if index < incoming_index {
warn!(
target: "sub-libp2p",
"PSM => Reject({:?}, {}, {:?}): Ignoring obsolete incoming index, we are already awaiting {:?}.",
index, incoming.peer_id, incoming.set_id, incoming_index
);
return
} else if index > incoming_index {
error!(
target: "sub-libp2p",
"PSM => Reject({:?}, {}, {:?}): Ignoring incoming index from the future, we are awaiting {:?}.",
index, incoming.peer_id, incoming.set_id, incoming_index
);
debug_assert!(false);
return
}
trace!(target: "sub-libp2p", "PSM => Reject({:?}, {}, {:?}): Rejecting connections.",
index, incoming.peer_id, incoming.set_id);
@@ -1195,7 +1207,7 @@ impl NetworkBehaviour for Notifications {
},
// Incoming => Incoming | Disabled | Backoff | Ø
PeerState::Incoming { mut connections, backoff_until } => {
PeerState::Incoming { mut connections, backoff_until, incoming_index } => {
trace!(
target: "sub-libp2p",
"Libp2p => Disconnected({}, {:?}, {:?}): OpenDesiredByRemote.",
@@ -1269,8 +1281,11 @@ impl NetworkBehaviour for Notifications {
*entry.get_mut() =
PeerState::Disabled { connections, backoff_until };
} else {
*entry.get_mut() =
PeerState::Incoming { connections, backoff_until };
*entry.get_mut() = PeerState::Incoming {
connections,
backoff_until,
incoming_index,
};
}
},
@@ -1489,7 +1504,7 @@ impl NetworkBehaviour for Notifications {
match mem::replace(entry.get_mut(), PeerState::Poisoned) {
// Incoming => Incoming
PeerState::Incoming { mut connections, backoff_until } => {
PeerState::Incoming { mut connections, backoff_until, incoming_index } => {
debug_assert!(connections
.iter()
.any(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote)));
@@ -1517,7 +1532,8 @@ impl NetworkBehaviour for Notifications {
debug_assert!(false);
}
*entry.into_mut() = PeerState::Incoming { connections, backoff_until };
*entry.into_mut() =
PeerState::Incoming { connections, backoff_until, incoming_index };
},
PeerState::Enabled { mut connections } => {
@@ -1582,8 +1598,11 @@ impl NetworkBehaviour for Notifications {
incoming_id,
});
*entry.into_mut() =
PeerState::Incoming { connections, backoff_until };
*entry.into_mut() = PeerState::Incoming {
connections,
backoff_until,
incoming_index: incoming_id,
};
} else {
// Connections in `OpeningThenClosing` and `Closing` state can be
// in a Closed phase, and as such can emit `OpenDesiredByRemote`
@@ -2087,6 +2106,7 @@ mod tests {
use super::*;
use crate::protocol::notifications::handler::tests::*;
use libp2p::swarm::AddressRecord;
use sc_peerset::IncomingIndex;
use std::{collections::HashSet, iter};
impl PartialEq for ConnectionState {
@@ -2279,7 +2299,7 @@ mod tests {
NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0 },
);
if let Some(&PeerState::Incoming { ref connections, backoff_until: None }) =
if let Some(&PeerState::Incoming { ref connections, backoff_until: None, .. }) =
notif.peers.get(&(peer, 0.into()))
{
assert_eq!(connections.len(), 1);
@@ -2424,8 +2444,10 @@ mod tests {
NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0 },
);
// attempt to connect to the peer and verify that the peer state is `Enabled`
notif.peerset_report_connect(peer, set_id);
// attempt to connect to the peer and verify that the peer state is `Enabled`;
// we rely on implementation detail that incoming indices are counted from 0
// to not mock the `Peerset`
notif.peerset_report_accept(IncomingIndex(0));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
}
@@ -2502,7 +2524,9 @@ mod tests {
conn,
NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0 },
);
notif.peerset_report_connect(peer, set_id);
// we rely on the implementation detail that incoming indices are counted from 0
// to not mock the `Peerset`
notif.peerset_report_accept(IncomingIndex(0));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
// disconnect peer and verify that the state is `Disabled`
@@ -2859,7 +2883,9 @@ mod tests {
);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
notif.peerset_report_connect(peer, set_id);
// We rely on the implementation detail that incoming indices are counted
// from 0 to not mock the `Peerset`.
notif.peerset_report_accept(sc_peerset::IncomingIndex(0));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
// open new substream
@@ -2948,7 +2974,6 @@ mod tests {
// check peer information
assert_eq!(notif.open_peers().collect::<Vec<_>>(), vec![&peer],);
assert_eq!(notif.reserved_peers(set_id).collect::<Vec<_>>(), Vec::<&PeerId>::new(),);
assert_eq!(notif.num_discovered_peers(), 0usize);
// close the other connection and verify that notification replacement event is emitted
@@ -3703,7 +3728,9 @@ mod tests {
);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
notif.peerset_report_connect(peer, set_id);
// we rely on the implementation detail that incoming indices are counted from 0
// to not mock the `Peerset`
notif.peerset_report_accept(IncomingIndex(0));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. })));
let event = conn_yielder.open_substream(peer, 0, connected, vec![1, 2, 3, 4]);
@@ -3834,7 +3861,6 @@ mod tests {
}
#[test]
#[should_panic]
#[cfg(debug_assertions)]
fn peerset_report_connect_with_disabled_pending_enable_peer() {
let (mut notif, _peerset) = development_notifs();
@@ -3872,11 +3898,15 @@ mod tests {
Some(&PeerState::DisabledPendingEnable { .. })
));
// duplicate "connect" must not change the state
notif.peerset_report_connect(peer, set_id);
assert!(std::matches!(
notif.peers.get(&(peer, set_id)),
Some(&PeerState::DisabledPendingEnable { .. })
));
}
#[test]
#[should_panic]
#[cfg(debug_assertions)]
fn peerset_report_connect_with_requested_peer() {
let (mut notif, _peerset) = development_notifs();
@@ -3887,11 +3917,12 @@ mod tests {
notif.peerset_report_connect(peer, set_id);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Requested)));
// Duplicate "connect" must not change the state.
notif.peerset_report_connect(peer, set_id);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Requested)));
}
#[test]
#[should_panic]
#[cfg(debug_assertions)]
fn peerset_report_connect_with_pending_requested() {
let (mut notif, _peerset) = development_notifs();
@@ -3940,11 +3971,50 @@ mod tests {
Some(&PeerState::PendingRequest { .. })
));
// duplicate "connect" must not change the state
notif.peerset_report_connect(peer, set_id);
assert!(std::matches!(
notif.peers.get(&(peer, set_id)),
Some(&PeerState::PendingRequest { .. })
));
}
#[test]
#[cfg(debug_assertions)]
fn peerset_report_connect_with_incoming_peer() {
let (mut notif, _peerset) = development_notifs();
let peer = PeerId::random();
let set_id = sc_peerset::SetId::from(0);
let conn = ConnectionId::new_unchecked(0);
let connected = ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
};
notif.on_swarm_event(FromSwarm::ConnectionEstablished(
libp2p::swarm::behaviour::ConnectionEstablished {
peer_id: peer,
connection_id: conn,
endpoint: &connected,
failed_addresses: &[],
other_established: 0usize,
},
));
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. })));
// remote opens a substream, verify that peer state is updated to `Incoming`
notif.on_connection_handler_event(
peer,
conn,
NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0 },
);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
notif.peerset_report_connect(peer, set_id);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
}
#[test]
#[should_panic]
#[cfg(debug_assertions)]
fn peerset_report_disconnect_with_incoming_peer() {
let (mut notif, _peerset) = development_notifs();
@@ -3973,10 +4043,10 @@ mod tests {
conn,
NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0 },
);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
notif.peerset_report_disconnect(peer, set_id);
assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. })));
}
#[test]