Add mitigation for the state inconsistency issue (#5996)

* Add mitigation for the state inconsistency issue

* Add logging
This commit is contained in:
Pierre Krieger
2020-05-13 14:54:59 +02:00
committed by GitHub
parent eb344a1739
commit 10492c0689
2 changed files with 75 additions and 17 deletions
@@ -981,14 +981,30 @@ impl NetworkBehaviour for GenericProto {
"`inject_disconnected` called for unknown peer {}",
peer_id),
Some(PeerState::Disabled { banned_until, .. }) => {
Some(PeerState::Disabled { open, banned_until, .. }) => {
if !open.is_empty() {
debug_assert!(false);
error!(
target: "sub-libp2p",
"State mismatch: disconnected from {} with non-empty list of connections",
peer_id
);
}
debug!(target: "sub-libp2p", "Libp2p => Disconnected({}): Was disabled.", peer_id);
if let Some(until) = banned_until {
self.peers.insert(peer_id.clone(), PeerState::Banned { until });
}
}
Some(PeerState::DisabledPendingEnable { timer_deadline, .. }) => {
Some(PeerState::DisabledPendingEnable { open, timer_deadline, .. }) => {
if !open.is_empty() {
debug_assert!(false);
error!(
target: "sub-libp2p",
"State mismatch: disconnected from {} with non-empty list of connections",
peer_id
);
}
debug!(target: "sub-libp2p",
"Libp2p => Disconnected({}): Was disabled but pending enable.",
peer_id);
@@ -997,7 +1013,15 @@ impl NetworkBehaviour for GenericProto {
self.peers.insert(peer_id.clone(), PeerState::Banned { until: timer_deadline });
}
Some(PeerState::Enabled { .. }) => {
Some(PeerState::Enabled { open, .. }) => {
if !open.is_empty() {
debug_assert!(false);
error!(
target: "sub-libp2p",
"State mismatch: disconnected from {} with non-empty list of connections",
peer_id
);
}
debug!(target: "sub-libp2p", "Libp2p => Disconnected({}): Was enabled.", peer_id);
debug!(target: "sub-libp2p", "PSM <= Dropped({})", peer_id);
self.peerset.dropped(peer_id.clone());
@@ -1088,8 +1112,16 @@ impl NetworkBehaviour for GenericProto {
let last = match mem::replace(entry.get_mut(), PeerState::Poisoned) {
PeerState::Enabled { mut open } => {
debug_assert!(open.iter().any(|c| c == &connection));
open.retain(|c| c != &connection);
if let Some(pos) = open.iter().position(|c| c == &connection) {
open.remove(pos);
} else {
debug_assert!(false);
error!(
target: "sub-libp2p",
"State mismatch with {}: unknown closed connection",
source
);
}
debug!(target: "sub-libp2p", "Handler({:?}) <= Disable", source);
self.events.push(NetworkBehaviourAction::NotifyHandler {
@@ -1114,8 +1146,17 @@ impl NetworkBehaviour for GenericProto {
last
},
PeerState::Disabled { mut open, banned_until } => {
debug_assert!(open.iter().any(|c| c == &connection));
open.retain(|c| c != &connection);
if let Some(pos) = open.iter().position(|c| c == &connection) {
open.remove(pos);
} else {
debug_assert!(false);
error!(
target: "sub-libp2p",
"State mismatch with {}: unknown closed connection",
source
);
}
let last = open.is_empty();
*entry.into_mut() = PeerState::Disabled {
open,
@@ -1128,8 +1169,17 @@ impl NetworkBehaviour for GenericProto {
timer,
timer_deadline
} => {
debug_assert!(open.iter().any(|c| c == &connection));
open.retain(|c| c != &connection);
if let Some(pos) = open.iter().position(|c| c == &connection) {
open.remove(pos);
} else {
debug_assert!(false);
error!(
target: "sub-libp2p",
"State mismatch with {}: unknown closed connection",
source
);
}
let last = open.is_empty();
*entry.into_mut() = PeerState::DisabledPendingEnable {
open,
@@ -1168,7 +1218,15 @@ impl NetworkBehaviour for GenericProto {
Some(PeerState::DisabledPendingEnable { ref mut open, .. }) |
Some(PeerState::Disabled { ref mut open, .. }) => {
let first = open.is_empty();
open.push(connection);
if !open.iter().any(|c| *c == connection) {
open.push(connection);
} else {
error!(
target: "sub-libp2p",
"State mismatch: connection with {} opened a second time",
source
);
}
first
}
state => {
@@ -115,7 +115,7 @@ impl IntoProtocolsHandler for LegacyProtoHandlerProto {
remote_peer_id: remote_peer_id.clone(),
state: ProtocolState::Init {
substreams: SmallVec::new(),
init_deadline: Delay::new(Duration::from_secs(5))
init_deadline: Delay::new(Duration::from_secs(20))
},
events_queue: SmallVec::new(),
}
@@ -353,26 +353,26 @@ impl LegacyProtoHandler {
ProtocolState::Init { substreams, mut init_deadline } => {
match Pin::new(&mut init_deadline).poll(cx) {
Poll::Ready(()) => {
init_deadline = Delay::new(Duration::from_secs(60));
error!(target: "sub-libp2p", "Handler initialization process is too long \
with {:?}", self.remote_peer_id)
with {:?}", self.remote_peer_id);
self.state = ProtocolState::KillAsap;
},
Poll::Pending => {}
Poll::Pending => {
self.state = ProtocolState::Init { substreams, init_deadline };
}
}
self.state = ProtocolState::Init { substreams, init_deadline };
None
}
ProtocolState::Opening { mut deadline } => {
match Pin::new(&mut deadline).poll(cx) {
Poll::Ready(()) => {
deadline = Delay::new(Duration::from_secs(60));
let event = LegacyProtoHandlerOut::ProtocolError {
is_severe: true,
error: "Timeout when opening protocol".to_string().into(),
};
self.state = ProtocolState::Opening { deadline };
self.state = ProtocolState::KillAsap;
Some(ProtocolsHandlerEvent::Custom(event))
},
Poll::Pending => {