From f008e0698529b969f633a9e941dfe48dc6316e12 Mon Sep 17 00:00:00 2001 From: Aaro Altonen <48052676+altonen@users.noreply.github.com> Date: Mon, 24 Jul 2023 10:47:37 +0300 Subject: [PATCH] Accept only `--in-peers` many inbound full nodes in `SyncingEngine` (#14603) * Accept only `--in-peers` many inbound full nodes in `SyncingEngine` Due to full and light nodes being stored in the same set, it's possible that `SyncingEngine` accepts more than `--in-peers` many inbound full nodes which leaves some of its outbound slots unoccupied. `ProtocolController` still tries to occupy these slots by opening outbound substreams. As these substreams are accepted by the remote peer, the connection is relayed to `SyncingEngine` which rejects the node because it's already full. This in turn results in the substream being inactive and the peer getting evicted. Fixing this properly would require relocating the light peer slot allocation away from `ProtocolController` or alternatively moving entire the substream validation there, both of which are epic refactorings and not necessarily in line with other goals. As a temporary measure, verify in `SyncingEngine` that it doesn't accept more than the specified amount of inbound full peers. * Fix tests * Apply review comments --- substrate/client/network/src/event.rs | 2 + substrate/client/network/src/protocol.rs | 3 ++ .../src/protocol/notifications/behaviour.rs | 4 ++ .../src/protocol/notifications/handler.rs | 31 ++++++----- substrate/client/network/sync/src/engine.rs | 53 +++++++++++++++++-- 5 files changed, 78 insertions(+), 15 deletions(-) diff --git a/substrate/client/network/src/event.rs b/substrate/client/network/src/event.rs index 9c1034ea3d..2913f0b552 100644 --- a/substrate/client/network/src/event.rs +++ b/substrate/client/network/src/event.rs @@ -106,6 +106,8 @@ pub enum SyncEvent { received_handshake: BlockAnnouncesHandshake, /// Notification sink. sink: NotificationsSink, + /// Is the connection inbound. + inbound: bool, /// Channel for reporting accept/reject of the substream. tx: oneshot::Sender, }, diff --git a/substrate/client/network/src/protocol.rs b/substrate/client/network/src/protocol.rs index e57bc3e520..0e444c2d33 100644 --- a/substrate/client/network/src/protocol.rs +++ b/substrate/client/network/src/protocol.rs @@ -451,6 +451,7 @@ impl NetworkBehaviour for Protocol { received_handshake, notifications_sink, negotiated_fallback, + inbound, } => { // Set number 0 is hardcoded the default set of peers we sync from. if set_id == HARDCODED_PEERSETS_SYNC { @@ -470,6 +471,7 @@ impl NetworkBehaviour for Protocol { let (tx, rx) = oneshot::channel(); let _ = self.tx.unbounded_send( crate::SyncEvent::NotificationStreamOpened { + inbound, remote: peer_id, received_handshake: handshake, sink: notifications_sink, @@ -510,6 +512,7 @@ impl NetworkBehaviour for Protocol { let (tx, rx) = oneshot::channel(); let _ = self.tx.unbounded_send( crate::SyncEvent::NotificationStreamOpened { + inbound, remote: peer_id, received_handshake: handshake, sink: notifications_sink, diff --git a/substrate/client/network/src/protocol/notifications/behaviour.rs b/substrate/client/network/src/protocol/notifications/behaviour.rs index 1659626cae..dd0971aef5 100644 --- a/substrate/client/network/src/protocol/notifications/behaviour.rs +++ b/substrate/client/network/src/protocol/notifications/behaviour.rs @@ -319,6 +319,8 @@ pub enum NotificationsOut { received_handshake: Vec, /// Object that permits sending notifications to the peer. notifications_sink: NotificationsSink, + /// Is the connection inbound. + inbound: bool, }, /// The [`NotificationsSink`] object used to send notifications with the given peer must be @@ -1810,6 +1812,7 @@ impl NetworkBehaviour for Notifications { negotiated_fallback, received_handshake, notifications_sink, + inbound, .. } => { let set_id = crate::peerset::SetId::from(protocol_index); @@ -1834,6 +1837,7 @@ impl NetworkBehaviour for Notifications { let event = NotificationsOut::CustomProtocolOpen { peer_id, set_id, + inbound, negotiated_fallback, received_handshake, notifications_sink: notifications_sink.clone(), diff --git a/substrate/client/network/src/protocol/notifications/handler.rs b/substrate/client/network/src/protocol/notifications/handler.rs index 0ac2e250a2..cffdec7d71 100644 --- a/substrate/client/network/src/protocol/notifications/handler.rs +++ b/substrate/client/network/src/protocol/notifications/handler.rs @@ -203,6 +203,8 @@ enum State { Opening { /// Substream opened by the remote. If `Some`, has been accepted. in_substream: Option>, + /// Is the connection inbound. + inbound: bool, }, /// Protocol is in the "Open" state. @@ -276,6 +278,8 @@ pub enum NotifsHandlerOut { received_handshake: Vec, /// How notifications can be sent to this node. notifications_sink: NotificationsSink, + /// Is the connection inbound. + inbound: bool, }, /// Acknowledges a [`NotifsHandlerIn::Open`]. The remote has refused the attempt to open @@ -518,7 +522,7 @@ impl ConnectionHandler for NotifsHandler { error!(target: "sub-libp2p", "☎️ State mismatch in notifications handler"); debug_assert!(false); }, - State::Opening { ref mut in_substream } => { + State::Opening { ref mut in_substream, inbound } => { let (async_tx, async_rx) = mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE); let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE); let notifications_sink = NotificationsSink { @@ -543,6 +547,7 @@ impl ConnectionHandler for NotifsHandler { endpoint: self.endpoint.clone(), received_handshake: new_open.handshake, notifications_sink, + inbound, }, )); }, @@ -597,7 +602,7 @@ impl ConnectionHandler for NotifsHandler { ); } - protocol_info.state = State::Opening { in_substream: None }; + protocol_info.state = State::Opening { in_substream: None, inbound: false }; }, State::OpenDesiredByRemote { pending_opening, in_substream } => { let handshake_message = protocol_info.config.handshake.read().clone(); @@ -623,12 +628,13 @@ impl ConnectionHandler for NotifsHandler { // The state change is done in two steps because of borrowing issues. let in_substream = match mem::replace( &mut protocol_info.state, - State::Opening { in_substream: None }, + State::Opening { in_substream: None, inbound: false }, ) { State::OpenDesiredByRemote { in_substream, .. } => in_substream, _ => unreachable!(), }; - protocol_info.state = State::Opening { in_substream: Some(in_substream) }; + protocol_info.state = + State::Opening { in_substream: Some(in_substream), inbound: true }; }, State::Opening { .. } | State::Open { .. } => { // As documented, it is forbidden to send an `Open` while there is already @@ -772,7 +778,7 @@ impl ConnectionHandler for NotifsHandler { match &mut self.protocols[protocol_index].state { State::Closed { .. } | State::Open { in_substream: None, .. } | - State::Opening { in_substream: None } => {}, + State::Opening { in_substream: None, .. } => {}, State::Open { in_substream: in_substream @ Some(_), .. } => match Stream::poll_next(Pin::new(in_substream.as_mut().unwrap()), cx) { @@ -893,6 +899,7 @@ pub mod tests { endpoint, received_handshake, notifications_sink, + inbound: false, } } @@ -1131,7 +1138,7 @@ pub mod tests { handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 }); assert!(std::matches!( handler.protocols[0].state, - State::Opening { in_substream: Some(_) } + State::Opening { in_substream: Some(_), .. } )); // remote now tries to open another substream, verify that it is rejected and closed @@ -1168,7 +1175,7 @@ pub mod tests { .await; assert!(std::matches!( handler.protocols[0].state, - State::Opening { in_substream: Some(_) } + State::Opening { in_substream: Some(_), .. } )); } @@ -1204,7 +1211,7 @@ pub mod tests { handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 }); assert!(std::matches!( handler.protocols[0].state, - State::Opening { in_substream: Some(_) } + State::Opening { in_substream: Some(_), .. } )); // accept the substream and move its state to `Open` @@ -1295,7 +1302,7 @@ pub mod tests { handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 }); assert!(std::matches!( handler.protocols[0].state, - State::Opening { in_substream: Some(_) } + State::Opening { in_substream: Some(_), .. } )); handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 }); @@ -1355,7 +1362,7 @@ pub mod tests { handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 }); assert!(std::matches!( handler.protocols[0].state, - State::Opening { in_substream: Some(_) } + State::Opening { in_substream: Some(_), .. } )); handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 }); @@ -1438,7 +1445,7 @@ pub mod tests { handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 }); assert!(std::matches!( handler.protocols[0].state, - State::Opening { in_substream: Some(_) } + State::Opening { in_substream: Some(_), .. } )); handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 }); @@ -1487,7 +1494,7 @@ pub mod tests { handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 }); assert!(std::matches!( handler.protocols[0].state, - State::Opening { in_substream: Some(_) } + State::Opening { in_substream: Some(_), .. } )); handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 }); diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index 1c57e4a275..47d920771e 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -178,6 +178,8 @@ pub struct Peer { last_notification_sent: Instant, /// Instant when the last notification was received from peer. last_notification_received: Instant, + /// Is the peer inbound. + inbound: bool, } pub struct SyncingEngine { @@ -238,6 +240,12 @@ pub struct SyncingEngine { /// Number of slots to allocate to light nodes. default_peers_set_num_light: usize, + /// Maximum number of inbound peers. + max_in_peers: usize, + + /// Number of inbound peers accepted so far. + num_in_peers: usize, + /// A cache for the data that was associated to a block announcement. block_announce_data_cache: LruMap>, @@ -370,6 +378,12 @@ where .flatten() .expect("Genesis block exists; qed"); + // `default_peers_set.in_peers` contains an unspecified amount of light peers so the number + // of full inbound peers must be calculated from the total full peer count + let max_full_peers = net_config.network_config.default_peers_set_num_full; + let max_out_peers = net_config.network_config.default_peers_set.out_peers; + let max_in_peers = (max_full_peers - max_out_peers) as usize; + Ok(( Self { roles, @@ -391,6 +405,8 @@ where default_peers_set_no_slot_peers, default_peers_set_num_full, default_peers_set_num_light, + num_in_peers: 0usize, + max_in_peers, event_streams: Vec::new(), tick_timeout: Delay::new(TICK_TIMEOUT), syncing_started: None, @@ -718,8 +734,9 @@ where remote, received_handshake, sink, + inbound, tx, - } => match self.on_sync_peer_connected(remote, &received_handshake, sink) { + } => match self.on_sync_peer_connected(remote, &received_handshake, sink, inbound) { Ok(()) => { let _ = tx.send(true); }, @@ -788,15 +805,31 @@ where /// /// Returns a result if the handshake of this peer was indeed accepted. pub fn on_sync_peer_disconnected(&mut self, peer: PeerId) -> Result<(), ()> { - if self.peers.remove(&peer).is_some() { + if let Some(info) = self.peers.remove(&peer) { if self.important_peers.contains(&peer) { log::warn!(target: "sync", "Reserved peer {} disconnected", peer); } else { log::debug!(target: "sync", "{} disconnected", peer); } + if !self.default_peers_set_no_slot_connected_peers.remove(&peer) && + info.inbound && info.info.roles.is_full() + { + match self.num_in_peers.checked_sub(1) { + Some(value) => { + self.num_in_peers = value; + }, + None => { + log::error!( + target: "sync", + "trying to disconnect an inbound node which is not counted as inbound" + ); + debug_assert!(false); + }, + } + } + self.chain_sync.peer_disconnected(&peer); - self.default_peers_set_no_slot_connected_peers.remove(&peer); self.event_streams .retain(|stream| stream.unbounded_send(SyncEvent::PeerDisconnected(peer)).is_ok()); Ok(()) @@ -815,6 +848,7 @@ where who: PeerId, status: &BlockAnnouncesHandshake, sink: NotificationsSink, + inbound: bool, ) -> Result<(), ()> { log::trace!(target: "sync", "New peer {} {:?}", who, status); @@ -857,6 +891,15 @@ where let no_slot_peer = self.default_peers_set_no_slot_peers.contains(&who); let this_peer_reserved_slot: usize = if no_slot_peer { 1 } else { 0 }; + // make sure to accept no more than `--in-peers` many full nodes + if !no_slot_peer && + status.roles.is_full() && + inbound && self.num_in_peers == self.max_in_peers + { + log::debug!(target: "sync", "All inbound slots have been consumed, rejecting {who}"); + return Err(()) + } + if status.roles.is_full() && self.chain_sync.num_peers() >= self.default_peers_set_num_full + @@ -887,6 +930,7 @@ where sink, last_notification_sent: Instant::now(), last_notification_received: Instant::now(), + inbound, }; let req = if peer.info.roles.is_full() { @@ -904,8 +948,11 @@ where log::debug!(target: "sync", "Connected {}", who); self.peers.insert(who, peer); + if no_slot_peer { self.default_peers_set_no_slot_connected_peers.insert(who); + } else if inbound && status.roles.is_full() { + self.num_in_peers += 1; } if let Some(req) = req {