diff --git a/substrate/client/network/src/event.rs b/substrate/client/network/src/event.rs index 9c1034ea3d..2913f0b552 100644 --- a/substrate/client/network/src/event.rs +++ b/substrate/client/network/src/event.rs @@ -106,6 +106,8 @@ pub enum SyncEvent { received_handshake: BlockAnnouncesHandshake, /// Notification sink. sink: NotificationsSink, + /// Is the connection inbound. + inbound: bool, /// Channel for reporting accept/reject of the substream. tx: oneshot::Sender, }, diff --git a/substrate/client/network/src/protocol.rs b/substrate/client/network/src/protocol.rs index e57bc3e520..0e444c2d33 100644 --- a/substrate/client/network/src/protocol.rs +++ b/substrate/client/network/src/protocol.rs @@ -451,6 +451,7 @@ impl NetworkBehaviour for Protocol { received_handshake, notifications_sink, negotiated_fallback, + inbound, } => { // Set number 0 is hardcoded the default set of peers we sync from. if set_id == HARDCODED_PEERSETS_SYNC { @@ -470,6 +471,7 @@ impl NetworkBehaviour for Protocol { let (tx, rx) = oneshot::channel(); let _ = self.tx.unbounded_send( crate::SyncEvent::NotificationStreamOpened { + inbound, remote: peer_id, received_handshake: handshake, sink: notifications_sink, @@ -510,6 +512,7 @@ impl NetworkBehaviour for Protocol { let (tx, rx) = oneshot::channel(); let _ = self.tx.unbounded_send( crate::SyncEvent::NotificationStreamOpened { + inbound, remote: peer_id, received_handshake: handshake, sink: notifications_sink, diff --git a/substrate/client/network/src/protocol/notifications/behaviour.rs b/substrate/client/network/src/protocol/notifications/behaviour.rs index 1659626cae..dd0971aef5 100644 --- a/substrate/client/network/src/protocol/notifications/behaviour.rs +++ b/substrate/client/network/src/protocol/notifications/behaviour.rs @@ -319,6 +319,8 @@ pub enum NotificationsOut { received_handshake: Vec, /// Object that permits sending notifications to the peer. notifications_sink: NotificationsSink, + /// Is the connection inbound. + inbound: bool, }, /// The [`NotificationsSink`] object used to send notifications with the given peer must be @@ -1810,6 +1812,7 @@ impl NetworkBehaviour for Notifications { negotiated_fallback, received_handshake, notifications_sink, + inbound, .. } => { let set_id = crate::peerset::SetId::from(protocol_index); @@ -1834,6 +1837,7 @@ impl NetworkBehaviour for Notifications { let event = NotificationsOut::CustomProtocolOpen { peer_id, set_id, + inbound, negotiated_fallback, received_handshake, notifications_sink: notifications_sink.clone(), diff --git a/substrate/client/network/src/protocol/notifications/handler.rs b/substrate/client/network/src/protocol/notifications/handler.rs index 0ac2e250a2..cffdec7d71 100644 --- a/substrate/client/network/src/protocol/notifications/handler.rs +++ b/substrate/client/network/src/protocol/notifications/handler.rs @@ -203,6 +203,8 @@ enum State { Opening { /// Substream opened by the remote. If `Some`, has been accepted. in_substream: Option>, + /// Is the connection inbound. + inbound: bool, }, /// Protocol is in the "Open" state. @@ -276,6 +278,8 @@ pub enum NotifsHandlerOut { received_handshake: Vec, /// How notifications can be sent to this node. notifications_sink: NotificationsSink, + /// Is the connection inbound. + inbound: bool, }, /// Acknowledges a [`NotifsHandlerIn::Open`]. The remote has refused the attempt to open @@ -518,7 +522,7 @@ impl ConnectionHandler for NotifsHandler { error!(target: "sub-libp2p", "☎️ State mismatch in notifications handler"); debug_assert!(false); }, - State::Opening { ref mut in_substream } => { + State::Opening { ref mut in_substream, inbound } => { let (async_tx, async_rx) = mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE); let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE); let notifications_sink = NotificationsSink { @@ -543,6 +547,7 @@ impl ConnectionHandler for NotifsHandler { endpoint: self.endpoint.clone(), received_handshake: new_open.handshake, notifications_sink, + inbound, }, )); }, @@ -597,7 +602,7 @@ impl ConnectionHandler for NotifsHandler { ); } - protocol_info.state = State::Opening { in_substream: None }; + protocol_info.state = State::Opening { in_substream: None, inbound: false }; }, State::OpenDesiredByRemote { pending_opening, in_substream } => { let handshake_message = protocol_info.config.handshake.read().clone(); @@ -623,12 +628,13 @@ impl ConnectionHandler for NotifsHandler { // The state change is done in two steps because of borrowing issues. let in_substream = match mem::replace( &mut protocol_info.state, - State::Opening { in_substream: None }, + State::Opening { in_substream: None, inbound: false }, ) { State::OpenDesiredByRemote { in_substream, .. } => in_substream, _ => unreachable!(), }; - protocol_info.state = State::Opening { in_substream: Some(in_substream) }; + protocol_info.state = + State::Opening { in_substream: Some(in_substream), inbound: true }; }, State::Opening { .. } | State::Open { .. } => { // As documented, it is forbidden to send an `Open` while there is already @@ -772,7 +778,7 @@ impl ConnectionHandler for NotifsHandler { match &mut self.protocols[protocol_index].state { State::Closed { .. } | State::Open { in_substream: None, .. } | - State::Opening { in_substream: None } => {}, + State::Opening { in_substream: None, .. } => {}, State::Open { in_substream: in_substream @ Some(_), .. } => match Stream::poll_next(Pin::new(in_substream.as_mut().unwrap()), cx) { @@ -893,6 +899,7 @@ pub mod tests { endpoint, received_handshake, notifications_sink, + inbound: false, } } @@ -1131,7 +1138,7 @@ pub mod tests { handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 }); assert!(std::matches!( handler.protocols[0].state, - State::Opening { in_substream: Some(_) } + State::Opening { in_substream: Some(_), .. } )); // remote now tries to open another substream, verify that it is rejected and closed @@ -1168,7 +1175,7 @@ pub mod tests { .await; assert!(std::matches!( handler.protocols[0].state, - State::Opening { in_substream: Some(_) } + State::Opening { in_substream: Some(_), .. } )); } @@ -1204,7 +1211,7 @@ pub mod tests { handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 }); assert!(std::matches!( handler.protocols[0].state, - State::Opening { in_substream: Some(_) } + State::Opening { in_substream: Some(_), .. } )); // accept the substream and move its state to `Open` @@ -1295,7 +1302,7 @@ pub mod tests { handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 }); assert!(std::matches!( handler.protocols[0].state, - State::Opening { in_substream: Some(_) } + State::Opening { in_substream: Some(_), .. } )); handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 }); @@ -1355,7 +1362,7 @@ pub mod tests { handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 }); assert!(std::matches!( handler.protocols[0].state, - State::Opening { in_substream: Some(_) } + State::Opening { in_substream: Some(_), .. } )); handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 }); @@ -1438,7 +1445,7 @@ pub mod tests { handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 }); assert!(std::matches!( handler.protocols[0].state, - State::Opening { in_substream: Some(_) } + State::Opening { in_substream: Some(_), .. } )); handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 }); @@ -1487,7 +1494,7 @@ pub mod tests { handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 }); assert!(std::matches!( handler.protocols[0].state, - State::Opening { in_substream: Some(_) } + State::Opening { in_substream: Some(_), .. } )); handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 }); diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index 1c57e4a275..47d920771e 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -178,6 +178,8 @@ pub struct Peer { last_notification_sent: Instant, /// Instant when the last notification was received from peer. last_notification_received: Instant, + /// Is the peer inbound. + inbound: bool, } pub struct SyncingEngine { @@ -238,6 +240,12 @@ pub struct SyncingEngine { /// Number of slots to allocate to light nodes. default_peers_set_num_light: usize, + /// Maximum number of inbound peers. + max_in_peers: usize, + + /// Number of inbound peers accepted so far. + num_in_peers: usize, + /// A cache for the data that was associated to a block announcement. block_announce_data_cache: LruMap>, @@ -370,6 +378,12 @@ where .flatten() .expect("Genesis block exists; qed"); + // `default_peers_set.in_peers` contains an unspecified amount of light peers so the number + // of full inbound peers must be calculated from the total full peer count + let max_full_peers = net_config.network_config.default_peers_set_num_full; + let max_out_peers = net_config.network_config.default_peers_set.out_peers; + let max_in_peers = (max_full_peers - max_out_peers) as usize; + Ok(( Self { roles, @@ -391,6 +405,8 @@ where default_peers_set_no_slot_peers, default_peers_set_num_full, default_peers_set_num_light, + num_in_peers: 0usize, + max_in_peers, event_streams: Vec::new(), tick_timeout: Delay::new(TICK_TIMEOUT), syncing_started: None, @@ -718,8 +734,9 @@ where remote, received_handshake, sink, + inbound, tx, - } => match self.on_sync_peer_connected(remote, &received_handshake, sink) { + } => match self.on_sync_peer_connected(remote, &received_handshake, sink, inbound) { Ok(()) => { let _ = tx.send(true); }, @@ -788,15 +805,31 @@ where /// /// Returns a result if the handshake of this peer was indeed accepted. pub fn on_sync_peer_disconnected(&mut self, peer: PeerId) -> Result<(), ()> { - if self.peers.remove(&peer).is_some() { + if let Some(info) = self.peers.remove(&peer) { if self.important_peers.contains(&peer) { log::warn!(target: "sync", "Reserved peer {} disconnected", peer); } else { log::debug!(target: "sync", "{} disconnected", peer); } + if !self.default_peers_set_no_slot_connected_peers.remove(&peer) && + info.inbound && info.info.roles.is_full() + { + match self.num_in_peers.checked_sub(1) { + Some(value) => { + self.num_in_peers = value; + }, + None => { + log::error!( + target: "sync", + "trying to disconnect an inbound node which is not counted as inbound" + ); + debug_assert!(false); + }, + } + } + self.chain_sync.peer_disconnected(&peer); - self.default_peers_set_no_slot_connected_peers.remove(&peer); self.event_streams .retain(|stream| stream.unbounded_send(SyncEvent::PeerDisconnected(peer)).is_ok()); Ok(()) @@ -815,6 +848,7 @@ where who: PeerId, status: &BlockAnnouncesHandshake, sink: NotificationsSink, + inbound: bool, ) -> Result<(), ()> { log::trace!(target: "sync", "New peer {} {:?}", who, status); @@ -857,6 +891,15 @@ where let no_slot_peer = self.default_peers_set_no_slot_peers.contains(&who); let this_peer_reserved_slot: usize = if no_slot_peer { 1 } else { 0 }; + // make sure to accept no more than `--in-peers` many full nodes + if !no_slot_peer && + status.roles.is_full() && + inbound && self.num_in_peers == self.max_in_peers + { + log::debug!(target: "sync", "All inbound slots have been consumed, rejecting {who}"); + return Err(()) + } + if status.roles.is_full() && self.chain_sync.num_peers() >= self.default_peers_set_num_full + @@ -887,6 +930,7 @@ where sink, last_notification_sent: Instant::now(), last_notification_received: Instant::now(), + inbound, }; let req = if peer.info.roles.is_full() { @@ -904,8 +948,11 @@ where log::debug!(target: "sync", "Connected {}", who); self.peers.insert(who, peer); + if no_slot_peer { self.default_peers_set_no_slot_connected_peers.insert(who); + } else if inbound && status.roles.is_full() { + self.num_in_peers += 1; } if let Some(req) = req {