mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-28 13:17:56 +00:00
Fix state inconsistency between handler and behaviour (#2220)
* Fix state inconsistency between handler and behaviour * Fix the error! being in the wrong place
This commit is contained in:
@@ -26,6 +26,35 @@ use std::{collections::hash_map::Entry, cmp, error, io, marker::PhantomData, mem
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
|
||||
/// Network behaviour that handles opening substreams for custom protocols with other nodes.
|
||||
///
|
||||
/// ## How it works
|
||||
///
|
||||
/// The role of the `CustomProto` is to synchronize the following components:
|
||||
///
|
||||
/// - The libp2p swarm that opens new connections and reports disconnects.
|
||||
/// - The connection handler (see `handler.rs`) that handles individual connections.
|
||||
/// - The peerset manager (PSM) that requests links to nodes to be established or broken.
|
||||
/// - The external API, that requires knowledge of the links that have been established.
|
||||
///
|
||||
/// Each connection handler can be in four different states: Enabled+Open, Enabled+Closed,
|
||||
/// Disabled+Open, or Disabled+Closed. The Enabled/Disabled component must be in sync with the
|
||||
/// peerset manager. For example, if the peerset manager requires a disconnection, we disable the
|
||||
/// existing handler. The Open/Closed component must be in sync with the external API.
|
||||
///
|
||||
/// However a connection handler only exists if we are actually connected to a node. What this
|
||||
/// means is that there are six possible states for each node: Disconnected, Dialing (trying to
|
||||
/// reach it), Enabled+Open, Enabled+Closed, Disabled+open, Disabled+Closed. Most notably, the
|
||||
/// Dialing state must correspond to a "link established" state in the peerset manager. In other
|
||||
/// words, the peerset manager doesn't differentiate whether we are dialing a node or connected
|
||||
/// to it.
|
||||
///
|
||||
/// Additionally, there also exists a "banning" system. If we fail to dial a node, we "ban" it for
|
||||
/// a few seconds. If the PSM requests a node that is in the "banned" state, then we delay the
|
||||
/// actual dialing attempt until after the ban expires, but the PSM will still consider the link
|
||||
/// to be established.
|
||||
/// Note that this "banning" system is not an actual ban. If a "banned" node tries to connect to
|
||||
/// us, we accept the connection. The "banning" system is only about delaying dialing attempts.
|
||||
///
|
||||
pub struct CustomProto<TMessage, TSubstream> {
|
||||
/// List of protocols to open with peers. Never modified.
|
||||
protocol: RegisteredProtocol<TMessage>,
|
||||
@@ -761,23 +790,50 @@ where
|
||||
match event {
|
||||
CustomProtoHandlerOut::CustomProtocolClosed { result } => {
|
||||
debug!(target: "sub-libp2p", "Handler({:?}) => Closed({:?})", source, result);
|
||||
match self.peers.get_mut(&source) {
|
||||
Some(PeerState::Enabled { ref mut open, .. }) if *open =>
|
||||
*open = false,
|
||||
Some(PeerState::Disabled { ref mut open, .. }) if *open =>
|
||||
*open = false,
|
||||
Some(PeerState::DisabledPendingEnable { ref mut open, .. }) if *open =>
|
||||
*open = false,
|
||||
_ => error!(target: "sub-libp2p", "State mismatch in the custom protos handler"),
|
||||
}
|
||||
|
||||
let mut entry = if let Entry::Occupied(entry) = self.peers.entry(source.clone()) {
|
||||
entry
|
||||
} else {
|
||||
error!(target: "sub-libp2p", "State mismatch in the custom protos handler");
|
||||
return
|
||||
};
|
||||
|
||||
debug!(target: "sub-libp2p", "External API <= Closed({:?})", source);
|
||||
let event = CustomProtoOut::CustomProtocolClosed {
|
||||
result,
|
||||
peer_id: source,
|
||||
peer_id: source.clone(),
|
||||
};
|
||||
|
||||
self.events.push(NetworkBehaviourAction::GenerateEvent(event));
|
||||
|
||||
match mem::replace(entry.get_mut(), PeerState::Poisoned) {
|
||||
PeerState::Enabled { open, connected_point } => {
|
||||
debug_assert!(open);
|
||||
|
||||
debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", source);
|
||||
self.peerset.dropped(source.clone());
|
||||
|
||||
debug!(target: "sub-libp2p", "Handler({:?}) <= Disable", source);
|
||||
self.events.push(NetworkBehaviourAction::SendEvent {
|
||||
peer_id: source.clone(),
|
||||
event: CustomProtoHandlerIn::Disable,
|
||||
});
|
||||
|
||||
*entry.into_mut() = PeerState::Disabled {
|
||||
open: false,
|
||||
connected_point,
|
||||
banned_until: None
|
||||
};
|
||||
},
|
||||
PeerState::Disabled { open, connected_point, banned_until } => {
|
||||
debug_assert!(open);
|
||||
*entry.into_mut() = PeerState::Disabled { open: false, connected_point, banned_until };
|
||||
},
|
||||
PeerState::DisabledPendingEnable { open, connected_point, timer } => {
|
||||
debug_assert!(open);
|
||||
*entry.into_mut() = PeerState::DisabledPendingEnable { open: false, connected_point, timer };
|
||||
},
|
||||
_ => error!(target: "sub-libp2p", "State mismatch in the custom protos handler"),
|
||||
}
|
||||
}
|
||||
|
||||
CustomProtoHandlerOut::CustomProtocolOpen { version } => {
|
||||
|
||||
@@ -36,10 +36,35 @@ use void::Void;
|
||||
/// Every time a connection with a remote starts, an instance of this struct is created and
|
||||
/// sent to a background task dedicated to this connection. Once the connection is established,
|
||||
/// it is turned into a `CustomProtoHandler`. It then handles all communications that are specific
|
||||
/// to Substrate on that connection.
|
||||
/// to Substrate on that single connection.
|
||||
///
|
||||
/// Note that there can be multiple instance of this struct simultaneously for same peer. However
|
||||
/// if that happens, only one main instance can communicate with the outer layers of the code.
|
||||
/// if that happens, only one main instance can communicate with the outer layers of the code. In
|
||||
/// other words, the outer layers of the code only ever see one handler.
|
||||
///
|
||||
/// ## State of the handler
|
||||
///
|
||||
/// There are six possible states for the handler:
|
||||
///
|
||||
/// - Enabled and open, which is a normal operation.
|
||||
/// - Enabled and closed, in which case it will try to open substreams.
|
||||
/// - Disabled and open, in which case it will try to close substreams.
|
||||
/// - Disabled and closed, in which case the handler is idle. The connection will be
|
||||
/// garbage-collected after a few seconds if nothing more happens.
|
||||
/// - Initializing and open.
|
||||
/// - Initializing and closed, which is the state the handler starts in.
|
||||
///
|
||||
/// The Init/Enabled/Disabled state is entirely controlled by the user by sending `Enable` or
|
||||
/// `Disable` messages to the handler. The handler itself never transitions automatically between
|
||||
/// these states. For example, if the handler reports a network misbehaviour, it will close the
|
||||
/// substreams but it is the role of the user to send a `Disabled` event if it wants the connection
|
||||
/// to close. Otherwise, the handler will try to reopen substreams.
|
||||
/// The handler starts in the "Initializing" state and must be transitionned to Enabled or Disabled
|
||||
/// as soon as possible.
|
||||
///
|
||||
/// The Open/Closed state is decided by the handler and is reported with the `CustomProtocolOpen`
|
||||
/// and `CustomProtocolClosed` events. The `CustomMessage` event can only be generated if the
|
||||
/// handler is open.
|
||||
///
|
||||
/// ## How it works
|
||||
///
|
||||
@@ -552,7 +577,7 @@ where
|
||||
return_value = Some(ProtocolsHandlerEvent::Custom(event));
|
||||
ProtocolState::Disabled {
|
||||
shutdown: shutdown.into_iter().collect(),
|
||||
reenable: false
|
||||
reenable: true
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
@@ -562,7 +587,7 @@ where
|
||||
return_value = Some(ProtocolsHandlerEvent::Custom(event));
|
||||
ProtocolState::Disabled {
|
||||
shutdown: shutdown.into_iter().collect(),
|
||||
reenable: false
|
||||
reenable: true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user