diff --git a/substrate/client/network/src/protocol/notifications/handler.rs b/substrate/client/network/src/protocol/notifications/handler.rs index 2b350cd7fc..99677cc45e 100644 --- a/substrate/client/network/src/protocol/notifications/handler.rs +++ b/substrate/client/network/src/protocol/notifications/handler.rs @@ -159,16 +159,6 @@ enum State { Closed { /// True if an outgoing substream is still in the process of being opened. pending_opening: bool, - - /// Outbound substream that has been accepted by the remote. Being closed. - out_substream_closing: Option>, - - /// Substream opened by the remote. Being closed. - in_substream_closing: Option>, - - /// Substream re-opened by the remote. Not to be closed after `in_substream_closing` has - /// been closed. - in_substream_reopened: Option>, }, /// Protocol is in the "Closed" state. A [`NotifsHandlerOut::OpenDesiredByRemote`] has been @@ -177,9 +167,6 @@ enum State { /// Substream opened by the remote and that hasn't been accepted/rejected yet. in_substream: NotificationsInSubstream, - /// Outbound substream that has been accepted by the remote. Being closed. - out_substream_closing: Option>, - /// See [`State::Closed::pending_opening`]. pending_opening: bool, }, @@ -190,15 +177,8 @@ enum State { /// A [`NotifsHandlerOut::OpenResultOk`] or a [`NotifsHandlerOut::OpenResultErr`] event must /// be emitted when transitionning to respectively [`State::Open`] or [`State::Closed`]. Opening { - /// Outbound substream that has been accepted by the remote. Being closed. An outbound - /// substream request has been emitted towards libp2p if and only if this field is `None`. - out_substream_closing: Option>, - - /// Substream re-opened by the remote. Has been accepted. - in_substream_reopened: Option>, - - /// Substream opened by the remote. Being closed. - in_substream_closing: Option>, + /// Substream opened by the remote. If `Some`, has been accepted. + in_substream: Option>, }, /// Protocol is in the "Open" state. @@ -247,9 +227,6 @@ impl IntoProtocolsHandler for NotifsHandlerProto { handshake, state: State::Closed { pending_opening: false, - in_substream_closing: None, - in_substream_reopened: None, - out_substream_closing: None, }, max_notification_size: max_size, } @@ -510,16 +487,7 @@ impl ProtocolsHandler for NotifsHandler { ) { let mut protocol_info = &mut self.protocols[protocol_index]; match protocol_info.state { - State::Closed { - ref mut pending_opening, - ref mut out_substream_closing, - ref mut in_substream_closing, - ref mut in_substream_reopened - } - if in_substream_closing.is_none() && in_substream_reopened.is_none() - => { - debug_assert!(!(out_substream_closing.is_some() && *pending_opening)); - + State::Closed { pending_opening } => { self.events_queue.push_back(ProtocolsHandlerEvent::Custom( NotifsHandlerOut::OpenDesiredByRemote { protocol_index, @@ -528,31 +496,9 @@ impl ProtocolsHandler for NotifsHandler { protocol_info.state = State::OpenDesiredByRemote { in_substream: new_substream, - out_substream_closing: out_substream_closing.take(), - pending_opening: *pending_opening, + pending_opening, }; }, - - State::Opening { ref mut in_substream_closing, ref mut in_substream_reopened, .. } => { - *in_substream_closing = None; - - // Create `handshake_message` on a separate line to be sure that the - // lock is released as soon as possible. - let handshake_message = protocol_info.handshake.read().clone(); - new_substream.send_handshake(handshake_message); - *in_substream_reopened = Some(new_substream); - }, - - State::Open { ref mut in_substream, .. } if in_substream.is_none() => { - // Create `handshake_message` on a separate line to be sure that the - // lock is released as soon as possible. - let handshake_message = protocol_info.handshake.read().clone(); - new_substream.send_handshake(handshake_message); - *in_substream = Some(new_substream); - }, - - State::Closed { .. } | - State::Open { .. } | State::OpenDesiredByRemote { .. } => { // If a substream already exists, silently drop the new one. // Note that we drop the substream, which will send an equivalent to a @@ -563,6 +509,19 @@ impl ProtocolsHandler for NotifsHandler { // to do. return; }, + State::Opening { ref mut in_substream, .. } | + State::Open { ref mut in_substream, .. } => { + if in_substream.is_some() { + // Same remark as above. + return; + } + + // Create `handshake_message` on a separate line to be sure that the + // lock is released as soon as possible. + let handshake_message = protocol_info.handshake.read().clone(); + new_substream.send_handshake(handshake_message); + *in_substream = Some(new_substream); + }, } } @@ -572,24 +531,16 @@ impl ProtocolsHandler for NotifsHandler { protocol_index: Self::OutboundOpenInfo ) { match self.protocols[protocol_index].state { - State::Closed { ref mut pending_opening, ref mut out_substream_closing, .. } | - State::OpenDesiredByRemote { ref mut pending_opening, ref mut out_substream_closing, .. } => { - debug_assert!(out_substream_closing.is_none()); + State::Closed { ref mut pending_opening } | + State::OpenDesiredByRemote { ref mut pending_opening, .. } => { debug_assert!(*pending_opening); - *out_substream_closing = Some(substream); *pending_opening = false; } State::Open { .. } => { error!(target: "sub-libp2p", "☎️ State mismatch in notifications handler"); debug_assert!(false); } - State::Opening { - ref mut in_substream_reopened, ref mut in_substream_closing, - ref mut out_substream_closing - } => { - debug_assert!(out_substream_closing.is_none()); - debug_assert!(!(in_substream_reopened.is_some() && in_substream_closing.is_some())); - + State::Opening { ref mut in_substream } => { let (async_tx, async_rx) = mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE); let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE); let notifications_sink = NotificationsSink { @@ -603,7 +554,7 @@ impl ProtocolsHandler for NotifsHandler { self.protocols[protocol_index].state = State::Open { notifications_sink_rx: stream::select(async_rx.fuse(), sync_rx.fuse()).peekable(), out_substream: Some(substream), - in_substream: in_substream_reopened.take().or(in_substream_closing.take()), + in_substream: in_substream.take(), }; self.events_queue.push_back(ProtocolsHandlerEvent::Custom( @@ -623,13 +574,8 @@ impl ProtocolsHandler for NotifsHandler { NotifsHandlerIn::Open { protocol_index } => { let protocol_info = &mut self.protocols[protocol_index]; match &mut protocol_info.state { - State::Closed { - ref mut pending_opening, - ref mut in_substream_closing, - ref mut in_substream_reopened, - ref mut out_substream_closing - } => { - if !*pending_opening && out_substream_closing.is_none() { + State::Closed { pending_opening } => { + if !*pending_opening { let proto = NotificationsOut::new( protocol_info.name.clone(), protocol_info.handshake.read().clone(), @@ -642,31 +588,14 @@ impl ProtocolsHandler for NotifsHandler { }); } - debug_assert!(!(in_substream_reopened.is_some() && in_substream_closing.is_some())); protocol_info.state = State::Opening { - in_substream_closing: in_substream_closing.take(), - in_substream_reopened: in_substream_reopened.take(), - out_substream_closing: out_substream_closing.take(), + in_substream: None, }; }, - - State::OpenDesiredByRemote { .. } => { - // The state change is done in two steps because of borrowing issues. - let (pending_opening, out_substream_closing, mut in_substream) = match - mem::replace(&mut protocol_info.state, - State::Opening { - in_substream_closing: None, in_substream_reopened: None, - out_substream_closing: None, - }) - { - State::OpenDesiredByRemote { pending_opening, out_substream_closing, in_substream, .. } => - (pending_opening, out_substream_closing, in_substream), - _ => unreachable!() - }; - + State::OpenDesiredByRemote { pending_opening, in_substream } => { let handshake_message = protocol_info.handshake.read().clone(); - if !pending_opening && out_substream_closing.is_none() { + if !*pending_opening { let proto = NotificationsOut::new( protocol_info.name.clone(), handshake_message.clone(), @@ -681,13 +610,17 @@ impl ProtocolsHandler for NotifsHandler { in_substream.send_handshake(handshake_message); + // The state change is done in two steps because of borrowing issues. + let in_substream = match + mem::replace(&mut protocol_info.state, State::Opening { in_substream: None }) + { + State::OpenDesiredByRemote { in_substream, .. } => in_substream, + _ => unreachable!() + }; protocol_info.state = State::Opening { - out_substream_closing, - in_substream_closing: None, - in_substream_reopened: Some(in_substream), + in_substream: Some(in_substream), }; }, - State::Opening { .. } | State::Open { .. } => { // As documented, it is forbidden to send an `Open` while there is already @@ -699,30 +632,15 @@ impl ProtocolsHandler for NotifsHandler { }, NotifsHandlerIn::Close { protocol_index } => { - match &mut self.protocols[protocol_index].state { - State::Open { in_substream, out_substream, .. } => { - if let Some(in_substream) = in_substream.as_mut() { - in_substream.set_close_desired(); - } + match self.protocols[protocol_index].state { + State::Open { .. } => { self.protocols[protocol_index].state = State::Closed { - in_substream_closing: in_substream.take(), - in_substream_reopened: None, - out_substream_closing: out_substream.take(), pending_opening: false, }; }, - State::Opening { in_substream_closing, in_substream_reopened, out_substream_closing } => { - debug_assert!(!(in_substream_reopened.is_some() && in_substream_closing.is_some())); - - let pending_opening = out_substream_closing.is_none(); - if let Some(in_substream_reopened) = in_substream_reopened.as_mut() { - in_substream_reopened.set_close_desired(); - } + State::Opening { .. } => { self.protocols[protocol_index].state = State::Closed { - in_substream_closing: in_substream_reopened.take().or(in_substream_closing.take()), - in_substream_reopened: None, - out_substream_closing: out_substream_closing.take(), - pending_opening, + pending_opening: true, }; self.events_queue.push_back(ProtocolsHandlerEvent::Custom( @@ -731,23 +649,8 @@ impl ProtocolsHandler for NotifsHandler { } )); }, - State::OpenDesiredByRemote { .. } => { - let (mut in_substream, pending_opening, out_substream_closing) = match mem::replace( - &mut self.protocols[protocol_index].state, - State::Closed { pending_opening: false, in_substream_closing: None, - in_substream_reopened: None, out_substream_closing: None, - } - ) { - State::OpenDesiredByRemote { in_substream, pending_opening, out_substream_closing } => - (in_substream, pending_opening, out_substream_closing), - _ => unreachable!("Can only enter this branch after OpenDesiredByRemote; qed") - }; - - in_substream.set_close_desired(); + State::OpenDesiredByRemote { pending_opening, .. } => { self.protocols[protocol_index].state = State::Closed { - in_substream_closing: Some(in_substream), - in_substream_reopened: None, - out_substream_closing, pending_opening, }; } @@ -769,30 +672,14 @@ impl ProtocolsHandler for NotifsHandler { _: ProtocolsHandlerUpgrErr ) { match self.protocols[num].state { - State::Closed { ref mut pending_opening, ref mut out_substream_closing, .. } | - State::OpenDesiredByRemote { ref mut pending_opening, ref mut out_substream_closing, .. } => { - debug_assert!(out_substream_closing.is_none()); + State::Closed { ref mut pending_opening } | + State::OpenDesiredByRemote { ref mut pending_opening, .. } => { debug_assert!(*pending_opening); *pending_opening = false; } - State::Opening { - ref mut out_substream_closing, - ref mut in_substream_closing, - ref mut in_substream_reopened, - .. - } => { - debug_assert!(!(in_substream_reopened.is_some() && in_substream_closing.is_some())); - debug_assert!(out_substream_closing.is_none()); - - if let Some(in_substream_reopened) = in_substream_reopened.as_mut() { - in_substream_reopened.set_close_desired(); - } - + State::Opening { .. } => { self.protocols[num].state = State::Closed { - out_substream_closing: None, - in_substream_closing: in_substream_reopened.take().or(in_substream_closing.take()), - in_substream_reopened: None, pending_opening: false, }; @@ -901,67 +788,15 @@ impl ProtocolsHandler for NotifsHandler { } } - // Try close outbound substreams that are marked for closing. - for protocol_index in 0..self.protocols.len() { - match &mut self.protocols[protocol_index].state { - State::Closed { out_substream_closing: ref mut substream @ Some(_), .. } | - State::OpenDesiredByRemote { out_substream_closing: ref mut substream @ Some(_), .. } | - State::Opening { out_substream_closing: ref mut substream @ Some(_), .. } => { - match Sink::poll_close(Pin::new(substream.as_mut().unwrap()), cx) { - Poll::Pending => {}, - Poll::Ready(_) => { - *substream = None; - - if matches!(self.protocols[protocol_index].state, State::Opening { .. }) { - let protocol_info = &mut self.protocols[protocol_index]; - let proto = NotificationsOut::new( - protocol_info.name.clone(), - protocol_info.handshake.read().clone(), - protocol_info.max_notification_size - ); - - self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(proto, protocol_index) - .with_timeout(OPEN_TIMEOUT), - }); - } - } - } - } - _ => {} - } - - if let State::Closed { - pending_opening, - out_substream_closing: None, - in_substream_closing, - in_substream_reopened: ref mut in_substream_reopened @ Some(_), - .. - } = &mut self.protocols[protocol_index].state { - debug_assert!(!*pending_opening); - debug_assert!(in_substream_closing.is_none()); - - self.events_queue.push_back(ProtocolsHandlerEvent::Custom( - NotifsHandlerOut::OpenDesiredByRemote { - protocol_index, - } - )); - - self.protocols[protocol_index].state = State::OpenDesiredByRemote { - in_substream: in_substream_reopened.take() - .expect("The if let above ensures that this is Some ; qed"), - out_substream_closing: None, - pending_opening: false, - }; - } - } - // Poll inbound substreams. for protocol_index in 0..self.protocols.len() { // Inbound substreams being closed is always tolerated, except for the // `OpenDesiredByRemote` state which might need to be switched back to `Closed`. match &mut self.protocols[protocol_index].state { - State::Open { in_substream: None, .. } => {} + State::Closed { .. } | + State::Open { in_substream: None, .. } | + State::Opening { in_substream: None } => {} + State::Open { in_substream: in_substream @ Some(_), .. } => { match Stream::poll_next(Pin::new(in_substream.as_mut().unwrap()), cx) { Poll::Pending => {}, @@ -977,16 +812,13 @@ impl ProtocolsHandler for NotifsHandler { } } - State::OpenDesiredByRemote { in_substream, pending_opening, out_substream_closing } => { + State::OpenDesiredByRemote { in_substream, pending_opening } => { match NotificationsInSubstream::poll_process(Pin::new(in_substream), cx) { Poll::Pending => {}, Poll::Ready(Ok(void)) => match void {}, Poll::Ready(Err(_)) => { self.protocols[protocol_index].state = State::Closed { pending_opening: *pending_opening, - in_substream_closing: None, - in_substream_reopened: None, - out_substream_closing: out_substream_closing.take(), }; return Poll::Ready(ProtocolsHandlerEvent::Custom( NotifsHandlerOut::CloseDesired { protocol_index } @@ -995,40 +827,13 @@ impl ProtocolsHandler for NotifsHandler { } } - State::Opening { in_substream_closing: None, in_substream_reopened: None, .. } | - State::Closed { in_substream_closing: None, in_substream_reopened: None, .. } => {} - - State::Opening { - in_substream_closing: ref mut in_substream @ Some(_), - in_substream_reopened: None, - .. - } | - State::Opening { - in_substream_closing: None, - in_substream_reopened: ref mut in_substream @ Some(_), - .. - } | - State::Closed { - in_substream_closing: ref mut in_substream @ Some(_), - in_substream_reopened: None, - .. - } | - State::Closed { - in_substream_closing: None, - in_substream_reopened: ref mut in_substream @ Some(_), - .. - } => { + State::Opening { in_substream: in_substream @ Some(_), .. } => { match NotificationsInSubstream::poll_process(Pin::new(in_substream.as_mut().unwrap()), cx) { Poll::Pending => {}, Poll::Ready(Ok(void)) => match void {}, Poll::Ready(Err(_)) => *in_substream = None, } } - - State::Opening { in_substream_closing: Some(_), in_substream_reopened: Some(_), .. } | - State::Closed { in_substream_closing: Some(_), in_substream_reopened: Some(_), .. } => { - debug_assert!(false); - } } } diff --git a/substrate/client/network/src/protocol/notifications/upgrade/notifications.rs b/substrate/client/network/src/protocol/notifications/upgrade/notifications.rs index f76472a0de..eba96441bc 100644 --- a/substrate/client/network/src/protocol/notifications/upgrade/notifications.rs +++ b/substrate/client/network/src/protocol/notifications/upgrade/notifications.rs @@ -88,11 +88,10 @@ enum NotificationsInSubstreamHandshake { PendingSend(Vec), /// Handshake message was pushed in the socket. Still need to flush. Flush, - /// Ready to receive notifications. Handshake message successfully sent and flushed, or - /// sending side closed before handshake sent. - Normal { write_side_open: bool }, - /// Closing our writing side. - Closing { remote_write_open: bool }, + /// Handshake message successfully sent and flushed. + Sent, + /// Remote has closed their writing side. We close our own writing side in return. + ClosingInResponseToRemote, /// Both our side and the remote have closed their writing side. BothSidesClosed, } @@ -170,30 +169,8 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static, impl NotificationsInSubstream where TSubstream: AsyncRead + AsyncWrite + Unpin, { - /// Closes the writing side of the substream, indicating to the remote that we would like this - /// substream to be closed. - pub fn set_close_desired(&mut self) { - match self.handshake { - NotificationsInSubstreamHandshake::PendingSend(_) | - NotificationsInSubstreamHandshake::Flush | - NotificationsInSubstreamHandshake::NotSent | - NotificationsInSubstreamHandshake::Normal { write_side_open: true } => { - self.handshake = NotificationsInSubstreamHandshake::Closing { remote_write_open: true }; - } - NotificationsInSubstreamHandshake::Normal { write_side_open: false } | - NotificationsInSubstreamHandshake::Closing { .. } | - NotificationsInSubstreamHandshake::BothSidesClosed => {} - } - } - /// Sends the handshake in order to inform the remote that we accept the substream. - /// - /// Has no effect if `set_close_desired` has been called. pub fn send_handshake(&mut self, message: impl Into>) { - if matches!(self.handshake, NotificationsInSubstreamHandshake::Normal { write_side_open: false }) { - return; - } - if !matches!(self.handshake, NotificationsInSubstreamHandshake::NotSent) { error!(target: "sub-libp2p", "Tried to send handshake twice"); return; @@ -208,7 +185,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin, let mut this = self.project(); loop { - match mem::replace(this.handshake, NotificationsInSubstreamHandshake::NotSent) { + match mem::replace(this.handshake, NotificationsInSubstreamHandshake::Sent) { NotificationsInSubstreamHandshake::PendingSend(msg) => match Sink::poll_ready(this.socket.as_mut(), cx) { Poll::Ready(_) => { @@ -226,28 +203,16 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin, NotificationsInSubstreamHandshake::Flush => match Sink::poll_flush(this.socket.as_mut(), cx)? { Poll::Ready(()) => - *this.handshake = NotificationsInSubstreamHandshake::Normal { write_side_open: true }, + *this.handshake = NotificationsInSubstreamHandshake::Sent, Poll::Pending => { *this.handshake = NotificationsInSubstreamHandshake::Flush; return Poll::Pending } }, - NotificationsInSubstreamHandshake::Closing { remote_write_open } => - match Sink::poll_close(this.socket.as_mut(), cx)? { - Poll::Ready(()) => if remote_write_open { - *this.handshake = NotificationsInSubstreamHandshake::Normal { write_side_open: false } - } else { - *this.handshake = NotificationsInSubstreamHandshake::BothSidesClosed; - }, - Poll::Pending => { - *this.handshake = NotificationsInSubstreamHandshake::Closing { remote_write_open }; - return Poll::Pending - } - }, - st @ NotificationsInSubstreamHandshake::NotSent | - st @ NotificationsInSubstreamHandshake::Normal { .. } | + st @ NotificationsInSubstreamHandshake::Sent | + st @ NotificationsInSubstreamHandshake::ClosingInResponseToRemote | st @ NotificationsInSubstreamHandshake::BothSidesClosed => { *this.handshake = st; return Poll::Pending; @@ -267,7 +232,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin, // This `Stream` implementation first tries to send back the handshake if necessary. loop { - match mem::replace(this.handshake, NotificationsInSubstreamHandshake::NotSent) { + match mem::replace(this.handshake, NotificationsInSubstreamHandshake::Sent) { NotificationsInSubstreamHandshake::NotSent => { *this.handshake = NotificationsInSubstreamHandshake::NotSent; return Poll::Pending @@ -289,39 +254,34 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin, NotificationsInSubstreamHandshake::Flush => match Sink::poll_flush(this.socket.as_mut(), cx)? { Poll::Ready(()) => - *this.handshake = NotificationsInSubstreamHandshake::Normal { write_side_open: true }, + *this.handshake = NotificationsInSubstreamHandshake::Sent, Poll::Pending => { *this.handshake = NotificationsInSubstreamHandshake::Flush; return Poll::Pending } }, - NotificationsInSubstreamHandshake::Normal { write_side_open } => { + NotificationsInSubstreamHandshake::Sent => { match Stream::poll_next(this.socket.as_mut(), cx) { - Poll::Ready(None) if write_side_open => - *this.handshake = - NotificationsInSubstreamHandshake::Closing { remote_write_open: false }, - Poll::Ready(None) => - *this.handshake = NotificationsInSubstreamHandshake::BothSidesClosed, + Poll::Ready(None) => *this.handshake = + NotificationsInSubstreamHandshake::ClosingInResponseToRemote, Poll::Ready(Some(msg)) => { - *this.handshake = NotificationsInSubstreamHandshake::Normal { write_side_open }; + *this.handshake = NotificationsInSubstreamHandshake::Sent; return Poll::Ready(Some(msg)) }, Poll::Pending => { - *this.handshake = NotificationsInSubstreamHandshake::Normal { write_side_open }; + *this.handshake = NotificationsInSubstreamHandshake::Sent; return Poll::Pending }, } }, - NotificationsInSubstreamHandshake::Closing { remote_write_open } => + NotificationsInSubstreamHandshake::ClosingInResponseToRemote => match Sink::poll_close(this.socket.as_mut(), cx)? { - Poll::Ready(()) if remote_write_open => - *this.handshake = NotificationsInSubstreamHandshake::Normal { write_side_open: false }, Poll::Ready(()) => *this.handshake = NotificationsInSubstreamHandshake::BothSidesClosed, Poll::Pending => { - *this.handshake = NotificationsInSubstreamHandshake::Closing { remote_write_open }; + *this.handshake = NotificationsInSubstreamHandshake::ClosingInResponseToRemote; return Poll::Pending } },