Add reason for closing and fix multiple substreams (#2437)

This commit is contained in:
Pierre Krieger
2019-05-01 09:42:54 +02:00
committed by Gavin Wood
parent 407511100c
commit dbff5c4962
3 changed files with 31 additions and 29 deletions
@@ -29,7 +29,7 @@ use libp2p::mdns::{Mdns, MdnsEvent};
use libp2p::multiaddr::Protocol; use libp2p::multiaddr::Protocol;
use libp2p::ping::{Ping, PingConfig, PingEvent, PingSuccess}; use libp2p::ping::{Ping, PingConfig, PingEvent, PingSuccess};
use log::{debug, info, trace, warn}; use log::{debug, info, trace, warn};
use std::{cmp, io, fmt, time::Duration}; use std::{borrow::Cow, cmp, fmt, time::Duration};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::{Delay, clock::Clock}; use tokio_timer::{Delay, clock::Clock};
use void; use void;
@@ -181,8 +181,8 @@ pub enum BehaviourOut<TMessage> {
CustomProtocolClosed { CustomProtocolClosed {
/// Id of the peer we were connected to. /// Id of the peer we were connected to.
peer_id: PeerId, peer_id: PeerId,
/// Reason why the substream closed. If `Ok`, then it's a graceful exit (EOF). /// Reason why the substream closed, for diagnostic purposes.
result: io::Result<()>, reason: Cow<'static, str>,
}, },
/// Receives a message on a custom protocol substream. /// Receives a message on a custom protocol substream.
@@ -224,8 +224,8 @@ impl<TMessage> From<CustomProtoOut<TMessage>> for BehaviourOut<TMessage> {
CustomProtoOut::CustomProtocolOpen { version, peer_id, endpoint } => { CustomProtoOut::CustomProtocolOpen { version, peer_id, endpoint } => {
BehaviourOut::CustomProtocolOpen { version, peer_id, endpoint } BehaviourOut::CustomProtocolOpen { version, peer_id, endpoint }
} }
CustomProtoOut::CustomProtocolClosed { peer_id, result } => { CustomProtoOut::CustomProtocolClosed { peer_id, reason } => {
BehaviourOut::CustomProtocolClosed { peer_id, result } BehaviourOut::CustomProtocolClosed { peer_id, reason }
} }
CustomProtoOut::CustomMessage { peer_id, message } => { CustomProtoOut::CustomMessage { peer_id, message } => {
BehaviourOut::CustomMessage { peer_id, message } BehaviourOut::CustomMessage { peer_id, message }
@@ -22,7 +22,7 @@ use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourActi
use libp2p::core::{Multiaddr, PeerId}; use libp2p::core::{Multiaddr, PeerId};
use log::{debug, error, trace, warn}; use log::{debug, error, trace, warn};
use smallvec::SmallVec; use smallvec::SmallVec;
use std::{collections::hash_map::Entry, cmp, error, io, marker::PhantomData, mem, time::Duration, time::Instant}; use std::{borrow::Cow, collections::hash_map::Entry, cmp, error, marker::PhantomData, mem, time::Duration, time::Instant};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::clock::Clock; use tokio_timer::clock::Clock;
@@ -182,8 +182,8 @@ pub enum CustomProtoOut<TMessage> {
CustomProtocolClosed { CustomProtocolClosed {
/// Id of the peer we were connected to. /// Id of the peer we were connected to.
peer_id: PeerId, peer_id: PeerId,
/// Reason why the substream closed. If `Ok`, then it's a graceful exit (EOF). /// Reason why the substream closed, for debugging purposes.
result: io::Result<()>, reason: Cow<'static, str>,
}, },
/// Receives a message on a custom protocol substream. /// Receives a message on a custom protocol substream.
@@ -696,7 +696,7 @@ where
debug!(target: "sub-libp2p", "External API <= Closed({:?})", peer_id); debug!(target: "sub-libp2p", "External API <= Closed({:?})", peer_id);
let event = CustomProtoOut::CustomProtocolClosed { let event = CustomProtoOut::CustomProtocolClosed {
peer_id: peer_id.clone(), peer_id: peer_id.clone(),
result: Ok(()), reason: "Disconnected by libp2p".into(),
}; };
self.events.push(NetworkBehaviourAction::GenerateEvent(event)); self.events.push(NetworkBehaviourAction::GenerateEvent(event));
@@ -713,7 +713,7 @@ where
debug!(target: "sub-libp2p", "External API <= Closed({:?})", peer_id); debug!(target: "sub-libp2p", "External API <= Closed({:?})", peer_id);
let event = CustomProtoOut::CustomProtocolClosed { let event = CustomProtoOut::CustomProtocolClosed {
peer_id: peer_id.clone(), peer_id: peer_id.clone(),
result: Ok(()), reason: "Disconnected by libp2p".into(),
}; };
self.events.push(NetworkBehaviourAction::GenerateEvent(event)); self.events.push(NetworkBehaviourAction::GenerateEvent(event));
@@ -730,7 +730,7 @@ where
debug!(target: "sub-libp2p", "External API <= Closed({:?})", peer_id); debug!(target: "sub-libp2p", "External API <= Closed({:?})", peer_id);
let event = CustomProtoOut::CustomProtocolClosed { let event = CustomProtoOut::CustomProtocolClosed {
peer_id: peer_id.clone(), peer_id: peer_id.clone(),
result: Ok(()), reason: "Disconnected by libp2p".into(),
}; };
self.events.push(NetworkBehaviourAction::GenerateEvent(event)); self.events.push(NetworkBehaviourAction::GenerateEvent(event));
@@ -802,8 +802,8 @@ where
event: CustomProtoHandlerOut<TMessage>, event: CustomProtoHandlerOut<TMessage>,
) { ) {
match event { match event {
CustomProtoHandlerOut::CustomProtocolClosed { result } => { CustomProtoHandlerOut::CustomProtocolClosed { reason } => {
debug!(target: "sub-libp2p", "Handler({:?}) => Closed({:?})", source, result); debug!(target: "sub-libp2p", "Handler({:?}) => Closed: {}", source, reason);
let mut entry = if let Entry::Occupied(entry) = self.peers.entry(source.clone()) { let mut entry = if let Entry::Occupied(entry) = self.peers.entry(source.clone()) {
entry entry
@@ -814,7 +814,7 @@ where
debug!(target: "sub-libp2p", "External API <= Closed({:?})", source); debug!(target: "sub-libp2p", "External API <= Closed({:?})", source);
let event = CustomProtoOut::CustomProtocolClosed { let event = CustomProtoOut::CustomProtocolClosed {
result, reason,
peer_id: source.clone(), peer_id: source.clone(),
}; };
self.events.push(NetworkBehaviourAction::GenerateEvent(event)); self.events.push(NetworkBehaviourAction::GenerateEvent(event));
@@ -27,7 +27,7 @@ use libp2p::core::{
}; };
use log::{debug, error}; use log::{debug, error};
use smallvec::{smallvec, SmallVec}; use smallvec::{smallvec, SmallVec};
use std::{error, fmt, io, marker::PhantomData, mem, time::Duration, time::Instant}; use std::{borrow::Cow, error, fmt, io, marker::PhantomData, mem, time::Duration, time::Instant};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::{Delay, clock::Clock}; use tokio_timer::{Delay, clock::Clock};
use void::Void; use void::Void;
@@ -231,8 +231,8 @@ pub enum CustomProtoHandlerOut<TMessage> {
/// Closed a custom protocol with the remote. /// Closed a custom protocol with the remote.
CustomProtocolClosed { CustomProtocolClosed {
/// Reason why the substream closed. If `Ok`, then it's a graceful exit (EOF). /// Reason why the substream closed, for diagnostic purposes.
result: io::Result<()>, reason: Cow<'static, str>,
}, },
/// Receives a message on a custom protocol substream. /// Receives a message on a custom protocol substream.
@@ -329,7 +329,7 @@ where
shutdown.push(substream); shutdown.push(substream);
} }
let event = CustomProtoHandlerOut::CustomProtocolClosed { let event = CustomProtoHandlerOut::CustomProtocolClosed {
result: Ok(()) reason: "Disabled on purpose on our side".into()
}; };
self.events_queue.push(ProtocolsHandlerEvent::Custom(event)); self.events_queue.push(ProtocolsHandlerEvent::Custom(event));
ProtocolState::Disabled { ProtocolState::Disabled {
@@ -394,7 +394,7 @@ where
} }
} }
ProtocolState::Normal { mut substreams, shutdown } => { ProtocolState::Normal { mut substreams, mut shutdown } => {
for n in (0..substreams.len()).rev() { for n in (0..substreams.len()).rev() {
let mut substream = substreams.swap_remove(n); let mut substream = substreams.swap_remove(n);
match substream.poll() { match substream.poll() {
@@ -416,20 +416,22 @@ where
return Some(ProtocolsHandlerEvent::Custom(event)); return Some(ProtocolsHandlerEvent::Custom(event));
} }
Ok(Async::Ready(None)) => { Ok(Async::Ready(None)) => {
let event = CustomProtoHandlerOut::CustomProtocolClosed { shutdown.push(substream);
result: Ok(()) if substreams.is_empty() {
}; let event = CustomProtoHandlerOut::CustomProtocolClosed {
substreams.push(substream); reason: "All substreams have been closed by the remote".into(),
self.state = ProtocolState::Disabled { };
shutdown: shutdown.into_iter().collect(), self.state = ProtocolState::Disabled {
reenable: true shutdown: shutdown.into_iter().collect(),
}; reenable: true
return Some(ProtocolsHandlerEvent::Custom(event)); };
return Some(ProtocolsHandlerEvent::Custom(event));
}
} }
Err(err) => { Err(err) => {
if substreams.is_empty() { if substreams.is_empty() {
let event = CustomProtoHandlerOut::CustomProtocolClosed { let event = CustomProtoHandlerOut::CustomProtocolClosed {
result: Err(err), reason: format!("Error on the last substream: {:?}", err).into(),
}; };
self.state = ProtocolState::Disabled { self.state = ProtocolState::Disabled {
shutdown: shutdown.into_iter().collect(), shutdown: shutdown.into_iter().collect(),