Start the handler init timer later (#2041)

This commit is contained in:
Pierre Krieger
2019-03-20 09:29:58 +01:00
committed by Arkadiy Paronyan
parent cd7d877cd4
commit 98a88a7d42
2 changed files with 57 additions and 25 deletions
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use crate::custom_proto::handler::{CustomProtoHandler, CustomProtoHandlerOut, CustomProtoHandlerIn};
use crate::custom_proto::handler::{CustomProtoHandlerProto, CustomProtoHandlerOut, CustomProtoHandlerIn};
use crate::custom_proto::topology::NetTopology;
use crate::custom_proto::upgrade::{CustomMessage, RegisteredProtocol};
use crate::{NetworkConfiguration, NonReservedPeerMode};
@@ -22,7 +22,7 @@ use crate::parse_str_addr;
use fnv::{FnvHashMap, FnvHashSet};
use futures::prelude::*;
use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use libp2p::core::{protocols_handler::ProtocolsHandler, Endpoint, Multiaddr, PeerId};
use libp2p::core::{Endpoint, Multiaddr, PeerId};
use log::{debug, trace, warn};
use smallvec::SmallVec;
use std::{cmp, error, io, marker::PhantomData, path::Path, time::Duration, time::Instant};
@@ -433,11 +433,11 @@ where
TSubstream: AsyncRead + AsyncWrite,
TMessage: CustomMessage,
{
type ProtocolsHandler = CustomProtoHandler<TMessage, TSubstream>;
type ProtocolsHandler = CustomProtoHandlerProto<TMessage, TSubstream>;
type OutEvent = CustomProtoOut<TMessage>;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
CustomProtoHandler::new(self.protocol.clone())
CustomProtoHandlerProto::new(self.protocol.clone())
}
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
@@ -575,7 +575,7 @@ where
fn inject_node_event(
&mut self,
source: PeerId,
event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
event: CustomProtoHandlerOut<TMessage>,
) {
match event {
CustomProtoHandlerOut::CustomProtocolClosed { result } => {
@@ -639,7 +639,7 @@ where
params: &mut PollParameters,
) -> Async<
NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
CustomProtoHandlerIn<TMessage>,
Self::OutEvent,
>,
> {
@@ -18,23 +18,25 @@ use crate::custom_proto::upgrade::{CustomMessage, CustomMessageId, RegisteredPro
use crate::custom_proto::upgrade::{RegisteredProtocolEvent, RegisteredProtocolSubstream};
use futures::prelude::*;
use libp2p::core::{
Endpoint, ProtocolsHandler, ProtocolsHandlerEvent,
PeerId, Endpoint, ProtocolsHandler, ProtocolsHandlerEvent,
protocols_handler::IntoProtocolsHandler,
protocols_handler::KeepAlive,
protocols_handler::ProtocolsHandlerUpgrErr,
upgrade::{InboundUpgrade, OutboundUpgrade}
};
use log::{debug, error, warn};
use smallvec::{smallvec, SmallVec};
use std::{error, fmt, io, mem, time::Duration, time::Instant};
use std::{error, fmt, io, marker::PhantomData, mem, time::Duration, time::Instant};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::Delay;
use void::Void;
/// Implements the `ProtocolsHandler` trait of libp2p.
/// Implements the `IntoProtocolsHandler` trait of libp2p.
///
/// Every time a connection with a remote is established, an instance of this struct is created and
/// sent to a background task dedicated to this connection. It handles all communications that are
/// specific to Substrate.
/// Every time a connection with a remote starts, an instance of this struct is created and
/// sent to a background task dedicated to this connection. Once the connection is established,
/// it is turned into a `CustomProtoHandler`. It then handles all communications that are specific
/// to Substrate on that connection.
///
/// Note that there can be multiple instance of this struct simultaneously for same peer. However
/// if that happens, only one main instance can communicate with the outer layers of the code.
@@ -62,6 +64,49 @@ use void::Void;
/// happens on one substream, we consider that we are disconnected. Re-enabling is performed by
/// opening an outbound substream.
///
pub struct CustomProtoHandlerProto<TMessage, TSubstream> {
/// Configuration for the protocol upgrade to negotiate.
protocol: RegisteredProtocol<TMessage>,
/// Marker to pin the generic type.
marker: PhantomData<TSubstream>,
}
impl<TMessage, TSubstream> CustomProtoHandlerProto<TMessage, TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
TMessage: CustomMessage,
{
/// Builds a new `CustomProtoHandlerProto`.
pub fn new(protocol: RegisteredProtocol<TMessage>) -> Self {
CustomProtoHandlerProto {
protocol,
marker: PhantomData,
}
}
}
impl<TMessage, TSubstream> IntoProtocolsHandler for CustomProtoHandlerProto<TMessage, TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
TMessage: CustomMessage,
{
type Handler = CustomProtoHandler<TMessage, TSubstream>;
fn into_handler(self, _: &PeerId) -> Self::Handler {
CustomProtoHandler {
protocol: self.protocol,
state: ProtocolState::Init {
substreams: SmallVec::new(),
init_deadline: Delay::new(Instant::now() + Duration::from_secs(5))
},
events_queue: SmallVec::new(),
warm_up_end: Instant::now() + Duration::from_secs(5),
}
}
}
/// The actual handler once the connection has been established.
pub struct CustomProtoHandler<TMessage, TSubstream> {
/// Configuration for the protocol upgrade to negotiate.
protocol: RegisteredProtocol<TMessage>,
@@ -311,19 +356,6 @@ where
TSubstream: AsyncRead + AsyncWrite,
TMessage: CustomMessage,
{
/// Builds a new `CustomProtoHandler`.
pub fn new(protocol: RegisteredProtocol<TMessage>) -> Self {
CustomProtoHandler {
protocol,
state: ProtocolState::Init {
substreams: SmallVec::new(),
init_deadline: Delay::new(Instant::now() + Duration::from_secs(5))
},
events_queue: SmallVec::new(),
warm_up_end: Instant::now() + Duration::from_secs(5),
}
}
/// Enables the handler.
fn enable(&mut self, endpoint: Endpoint) {
self.state = match mem::replace(&mut self.state, ProtocolState::Poisoned) {