diff --git a/substrate/core/network/src/custom_proto/behaviour.rs b/substrate/core/network/src/custom_proto/behaviour.rs index 61fd9c3e7b..babd93f932 100644 --- a/substrate/core/network/src/custom_proto/behaviour.rs +++ b/substrate/core/network/src/custom_proto/behaviour.rs @@ -16,13 +16,15 @@ use crate::{DiscoveryNetBehaviour, config::ProtocolId}; use crate::custom_proto::handler::{CustomProtoHandlerProto, CustomProtoHandlerOut, CustomProtoHandlerIn}; -use crate::custom_proto::upgrade::{CustomMessage, RegisteredProtocol}; +use crate::custom_proto::upgrade::RegisteredProtocol; +use crate::protocol::message::Message; use fnv::FnvHashMap; use futures::prelude::*; use futures03::{compat::Compat, TryFutureExt as _, StreamExt as _, TryStreamExt as _}; use libp2p::core::{ConnectedPoint, Multiaddr, PeerId}; use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters}; use log::{debug, error, trace, warn}; +use runtime_primitives::traits::Block as BlockT; use smallvec::SmallVec; use std::{borrow::Cow, collections::hash_map::Entry, cmp, error, marker::PhantomData, mem, pin::Pin}; use std::time::{Duration, Instant}; @@ -58,9 +60,9 @@ use tokio_io::{AsyncRead, AsyncWrite}; /// Note that this "banning" system is not an actual ban. If a "banned" node tries to connect to /// us, we accept the connection. The "banning" system is only about delaying dialing attempts. /// -pub struct CustomProto { +pub struct CustomProto { /// List of protocols to open with peers. Never modified. - protocol: RegisteredProtocol, + protocol: RegisteredProtocol, /// Receiver for instructions about who to connect to or disconnect from. peerset: peerset::Peerset, @@ -77,7 +79,7 @@ pub struct CustomProto { next_incoming_index: peerset::IncomingIndex, /// Events to produce from `poll()`. - events: SmallVec<[NetworkBehaviourAction, CustomProtoOut>; 4]>, + events: SmallVec<[NetworkBehaviourAction, CustomProtoOut>; 4]>, /// Marker to pin the generics. marker: PhantomData, @@ -186,7 +188,7 @@ struct IncomingPeer { /// Event that can be emitted by the `CustomProto`. #[derive(Debug)] -pub enum CustomProtoOut { +pub enum CustomProtoOut { /// Opened a custom protocol with the remote. CustomProtocolOpen { /// Version of the protocol that has been opened. @@ -210,7 +212,7 @@ pub enum CustomProtoOut { /// Id of the peer the message came from. peer_id: PeerId, /// Message that has been received. - message: TMessage, + message: Message, }, /// The substream used by the protocol is pretty large. We should print avoid sending more @@ -219,11 +221,11 @@ pub enum CustomProtoOut { /// Id of the peer which is clogged. peer_id: PeerId, /// Copy of the messages that are within the buffer, for further diagnostic. - messages: Vec, + messages: Vec>, }, } -impl CustomProto { +impl CustomProto { /// Creates a `CustomProtos`. pub fn new( protocol: impl Into, @@ -347,7 +349,8 @@ impl CustomProto { /// /// Also note that even we have a valid open substream, it may in fact be already closed /// without us knowing, in which case the packet will not be received. - pub fn send_packet(&mut self, target: &PeerId, message: TMessage) { + pub fn send_packet(&mut self, target: &PeerId, message: Message) + where B: BlockT { if !self.is_open(target) { return; } @@ -603,7 +606,7 @@ impl CustomProto { } } -impl DiscoveryNetBehaviour for CustomProto { +impl DiscoveryNetBehaviour for CustomProto { fn add_discovered_nodes(&mut self, peer_ids: impl Iterator) { self.peerset.discovered(peer_ids.into_iter().map(|peer_id| { debug!(target: "sub-libp2p", "PSM <= Discovered({:?})", peer_id); @@ -612,13 +615,13 @@ impl DiscoveryNetBehaviour for CustomProto NetworkBehaviour for CustomProto +impl NetworkBehaviour for CustomProto where TSubstream: AsyncRead + AsyncWrite, - TMessage: CustomMessage, + B: BlockT, { - type ProtocolsHandler = CustomProtoHandlerProto; - type OutEvent = CustomProtoOut; + type ProtocolsHandler = CustomProtoHandlerProto; + type OutEvent = CustomProtoOut; fn new_handler(&mut self) -> Self::ProtocolsHandler { CustomProtoHandlerProto::new(self.protocol.clone()) @@ -816,7 +819,7 @@ where fn inject_node_event( &mut self, source: PeerId, - event: CustomProtoHandlerOut, + event: CustomProtoHandlerOut, ) { match event { CustomProtoHandlerOut::CustomProtocolClosed { reason } => { @@ -945,7 +948,7 @@ where _params: &mut impl PollParameters, ) -> Async< NetworkBehaviourAction< - CustomProtoHandlerIn, + CustomProtoHandlerIn, Self::OutEvent, >, > { diff --git a/substrate/core/network/src/custom_proto/handler.rs b/substrate/core/network/src/custom_proto/handler.rs index 6609998242..6904d18c4b 100644 --- a/substrate/core/network/src/custom_proto/handler.rs +++ b/substrate/core/network/src/custom_proto/handler.rs @@ -14,8 +14,8 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -use crate::custom_proto::upgrade::{CustomMessage, RegisteredProtocol}; -use crate::custom_proto::upgrade::{RegisteredProtocolEvent, RegisteredProtocolSubstream}; +use crate::custom_proto::upgrade::{RegisteredProtocol, RegisteredProtocolEvent, RegisteredProtocolSubstream}; +use crate::protocol::message::Message; use futures::prelude::*; use futures03::{compat::Compat, TryFutureExt as _}; use futures_timer::Delay; @@ -29,6 +29,7 @@ use libp2p::swarm::{ SubstreamProtocol, }; use log::{debug, error}; +use runtime_primitives::traits::Block as BlockT; use smallvec::{smallvec, SmallVec}; use std::{borrow::Cow, error, fmt, io, marker::PhantomData, mem, time::Duration}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -87,21 +88,21 @@ use tokio_io::{AsyncRead, AsyncWrite}; /// We consider that we are now "closed" if the remote closes all the existing substreams. /// Re-opening it can then be performed by closing all active substream and re-opening one. /// -pub struct CustomProtoHandlerProto { +pub struct CustomProtoHandlerProto { /// Configuration for the protocol upgrade to negotiate. - protocol: RegisteredProtocol, + protocol: RegisteredProtocol, /// Marker to pin the generic type. marker: PhantomData, } -impl CustomProtoHandlerProto +impl CustomProtoHandlerProto where TSubstream: AsyncRead + AsyncWrite, - TMessage: CustomMessage, + B: BlockT, { /// Builds a new `CustomProtoHandlerProto`. - pub fn new(protocol: RegisteredProtocol) -> Self { + pub fn new(protocol: RegisteredProtocol) -> Self { CustomProtoHandlerProto { protocol, marker: PhantomData, @@ -109,14 +110,14 @@ where } } -impl IntoProtocolsHandler for CustomProtoHandlerProto +impl IntoProtocolsHandler for CustomProtoHandlerProto where TSubstream: AsyncRead + AsyncWrite, - TMessage: CustomMessage, + B: BlockT, { - type Handler = CustomProtoHandler; + type Handler = CustomProtoHandler; - fn inbound_protocol(&self) -> RegisteredProtocol { + fn inbound_protocol(&self) -> RegisteredProtocol { self.protocol.clone() } @@ -135,12 +136,12 @@ where } /// The actual handler once the connection has been established. -pub struct CustomProtoHandler { +pub struct CustomProtoHandler { /// Configuration for the protocol upgrade to negotiate. - protocol: RegisteredProtocol, + protocol: RegisteredProtocol, /// State of the communications with the remote. - state: ProtocolState, + state: ProtocolState, /// Identifier of the node we're talking to. Used only for logging purposes and shouldn't have /// any influence on the behaviour. @@ -154,15 +155,15 @@ pub struct CustomProtoHandler { /// /// This queue must only ever be modified to insert elements at the back, or remove the first /// element. - events_queue: SmallVec<[ProtocolsHandlerEvent, (), CustomProtoHandlerOut>; 16]>, + events_queue: SmallVec<[ProtocolsHandlerEvent, (), CustomProtoHandlerOut>; 16]>, } /// State of the handler. -enum ProtocolState { +enum ProtocolState { /// Waiting for the behaviour to tell the handler whether it is enabled or disabled. Init { /// List of substreams opened by the remote but that haven't been processed yet. - substreams: SmallVec<[RegisteredProtocolSubstream; 6]>, + substreams: SmallVec<[RegisteredProtocolSubstream; 6]>, /// Deadline after which the initialization is abnormally long. init_deadline: Compat, }, @@ -178,9 +179,9 @@ enum ProtocolState { /// If we are in this state, we have sent a `CustomProtocolOpen` message to the outside. Normal { /// The substreams where bidirectional communications happen. - substreams: SmallVec<[RegisteredProtocolSubstream; 4]>, + substreams: SmallVec<[RegisteredProtocolSubstream; 4]>, /// Contains substreams which are being shut down. - shutdown: SmallVec<[RegisteredProtocolSubstream; 4]>, + shutdown: SmallVec<[RegisteredProtocolSubstream; 4]>, }, /// We are disabled. Contains substreams that are being closed. @@ -188,7 +189,7 @@ enum ProtocolState { /// outside or we have never sent any `CustomProtocolOpen` in the first place. Disabled { /// List of substreams to shut down. - shutdown: SmallVec<[RegisteredProtocolSubstream; 6]>, + shutdown: SmallVec<[RegisteredProtocolSubstream; 6]>, /// If true, we should reactivate the handler after all the substreams in `shutdown` have /// been closed. @@ -209,7 +210,7 @@ enum ProtocolState { /// Event that can be received by a `CustomProtoHandler`. #[derive(Debug)] -pub enum CustomProtoHandlerIn { +pub enum CustomProtoHandlerIn { /// The node should start using custom protocols. Enable, @@ -219,13 +220,13 @@ pub enum CustomProtoHandlerIn { /// Sends a message through a custom protocol substream. SendCustomMessage { /// The message to send. - message: TMessage, + message: Message, }, } /// Event that can be emitted by a `CustomProtoHandler`. #[derive(Debug)] -pub enum CustomProtoHandlerOut { +pub enum CustomProtoHandlerOut { /// Opened a custom protocol with the remote. CustomProtocolOpen { /// Version of the protocol that has been opened. @@ -241,14 +242,14 @@ pub enum CustomProtoHandlerOut { /// Receives a message on a custom protocol substream. CustomMessage { /// Message that has been received. - message: TMessage, + message: Message, }, /// A substream to the remote is clogged. The send buffer is very large, and we should print /// a diagnostic message and/or avoid sending more data. Clogged { /// Copy of the messages that are within the buffer, for further diagnostic. - messages: Vec, + messages: Vec>, }, /// An error has happened on the protocol level with this node. @@ -260,10 +261,10 @@ pub enum CustomProtoHandlerOut { }, } -impl CustomProtoHandler +impl CustomProtoHandler where TSubstream: AsyncRead + AsyncWrite, - TMessage: CustomMessage, + B: BlockT, { /// Enables the handler. fn enable(&mut self) { @@ -341,7 +342,7 @@ where /// Polls the state for events. Optionally returns an event to produce. #[must_use] fn poll_state(&mut self) - -> Option, (), CustomProtoHandlerOut>> { + -> Option, (), CustomProtoHandlerOut>> { match mem::replace(&mut self.state, ProtocolState::Poisoned) { ProtocolState::Poisoned => { error!(target: "sub-libp2p", "Handler with {:?} is in poisoned state", @@ -470,7 +471,7 @@ where /// Called by `inject_fully_negotiated_inbound` and `inject_fully_negotiated_outbound`. fn inject_fully_negotiated( &mut self, - mut substream: RegisteredProtocolSubstream + mut substream: RegisteredProtocolSubstream ) { self.state = match mem::replace(&mut self.state, ProtocolState::Poisoned) { ProtocolState::Poisoned => { @@ -515,7 +516,7 @@ where } /// Sends a message to the remote. - fn send_message(&mut self, message: TMessage) { + fn send_message(&mut self, message: Message) { match self.state { ProtocolState::Normal { ref mut substreams, .. } => substreams[0].send_message(message), @@ -526,14 +527,14 @@ where } } -impl ProtocolsHandler for CustomProtoHandler -where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage { - type InEvent = CustomProtoHandlerIn; - type OutEvent = CustomProtoHandlerOut; +impl ProtocolsHandler for CustomProtoHandler +where TSubstream: AsyncRead + AsyncWrite, B: BlockT { + type InEvent = CustomProtoHandlerIn; + type OutEvent = CustomProtoHandlerOut; type Substream = TSubstream; type Error = ConnectionKillError; - type InboundProtocol = RegisteredProtocol; - type OutboundProtocol = RegisteredProtocol; + type InboundProtocol = RegisteredProtocol; + type OutboundProtocol = RegisteredProtocol; type OutboundOpenInfo = (); fn listen_protocol(&self) -> SubstreamProtocol { @@ -555,7 +556,7 @@ where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage { self.inject_fully_negotiated(proto); } - fn inject_event(&mut self, message: CustomProtoHandlerIn) { + fn inject_event(&mut self, message: CustomProtoHandlerIn) { match message { CustomProtoHandlerIn::Disable => self.disable(), CustomProtoHandlerIn::Enable => self.enable(), @@ -612,7 +613,7 @@ where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage { } } -impl fmt::Debug for CustomProtoHandler +impl fmt::Debug for CustomProtoHandler where TSubstream: AsyncRead + AsyncWrite, { @@ -624,9 +625,9 @@ where /// Given a list of substreams, tries to shut them down. The substreams that have been successfully /// shut down are removed from the list. -fn shutdown_list - (list: &mut SmallVec>>) -where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage { +fn shutdown_list + (list: &mut SmallVec>>) +where TSubstream: AsyncRead + AsyncWrite, B: BlockT { 'outer: for n in (0..list.len()).rev() { let mut substream = list.swap_remove(n); loop { diff --git a/substrate/core/network/src/custom_proto/mod.rs b/substrate/core/network/src/custom_proto/mod.rs index 22c66c1654..99f4f0183a 100644 --- a/substrate/core/network/src/custom_proto/mod.rs +++ b/substrate/core/network/src/custom_proto/mod.rs @@ -15,7 +15,6 @@ // along with Substrate. If not, see . pub use self::behaviour::{CustomProto, CustomProtoOut}; -pub use self::upgrade::CustomMessage; mod behaviour; mod handler; diff --git a/substrate/core/network/src/custom_proto/tests.rs b/substrate/core/network/src/custom_proto/tests.rs index b7e72af898..94456b1a0f 100644 --- a/substrate/core/network/src/custom_proto/tests.rs +++ b/substrate/core/network/src/custom_proto/tests.rs @@ -25,15 +25,15 @@ use libp2p::{PeerId, Multiaddr, Transport}; use rand::seq::SliceRandom; use std::{io, time::Duration, time::Instant}; use test_client::runtime::Block; -use crate::message::{Message as MessageAlias, generic::Message}; -use crate::custom_proto::{CustomProto, CustomProtoOut, CustomMessage}; +use crate::message::generic::Message; +use crate::custom_proto::{CustomProto, CustomProtoOut}; /// Builds two nodes that have each other as bootstrap nodes. /// This is to be used only for testing, and a panic will happen if something goes wrong. -fn build_nodes() +fn build_nodes() -> ( - Swarm, CustomProtoWithAddr>, - Swarm, CustomProtoWithAddr> + Swarm, CustomProtoWithAddr>, + Swarm, CustomProtoWithAddr> ) { let mut out = Vec::with_capacity(2); @@ -100,29 +100,29 @@ fn build_nodes() } /// Wraps around the `CustomBehaviour` network behaviour, and adds hardcoded node addresses to it. -struct CustomProtoWithAddr { - inner: CustomProto>, +struct CustomProtoWithAddr { + inner: CustomProto>, addrs: Vec<(PeerId, Multiaddr)>, } -impl std::ops::Deref for CustomProtoWithAddr { - type Target = CustomProto>; +impl std::ops::Deref for CustomProtoWithAddr { + type Target = CustomProto>; fn deref(&self) -> &Self::Target { &self.inner } } -impl std::ops::DerefMut for CustomProtoWithAddr { +impl std::ops::DerefMut for CustomProtoWithAddr { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.inner } } -impl NetworkBehaviour for CustomProtoWithAddr { +impl NetworkBehaviour for CustomProtoWithAddr { type ProtocolsHandler = - > as NetworkBehaviour>::ProtocolsHandler; - type OutEvent = > as NetworkBehaviour>::OutEvent; + > as NetworkBehaviour>::ProtocolsHandler; + type OutEvent = > as NetworkBehaviour>::OutEvent; fn new_handler(&mut self) -> Self::ProtocolsHandler { self.inner.new_handler() @@ -200,7 +200,7 @@ fn two_nodes_transfer_lots_of_packets() { // substreams allowed by the multiplexer. const NUM_PACKETS: u32 = 5000; - let (mut service1, mut service2) = build_nodes::>(); + let (mut service1, mut service2) = build_nodes(); let fut1 = future::poll_fn(move || -> io::Result<_> { loop { @@ -241,7 +241,7 @@ fn two_nodes_transfer_lots_of_packets() { #[test] fn basic_two_nodes_requests_in_parallel() { - let (mut service1, mut service2) = build_nodes::>(); + let (mut service1, mut service2) = build_nodes(); // Generate random messages with or without a request id. let mut to_send = { @@ -296,7 +296,7 @@ fn reconnect_after_disconnect() { // We connect two nodes together, then force a disconnect (through the API of the `Service`), // check that the disconnect worked, and finally check whether they successfully reconnect. - let (mut service1, mut service2) = build_nodes::>(); + let (mut service1, mut service2) = build_nodes(); // We use the `current_thread` runtime because it doesn't require us to have `'static` futures. let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap(); diff --git a/substrate/core/network/src/custom_proto/upgrade.rs b/substrate/core/network/src/custom_proto/upgrade.rs index 4cb6cb5dd9..c01de89cdd 100644 --- a/substrate/core/network/src/custom_proto/upgrade.rs +++ b/substrate/core/network/src/custom_proto/upgrade.rs @@ -15,12 +15,15 @@ // along with Substrate. If not, see . use crate::config::ProtocolId; +use crate::protocol::message::Message; use bytes::Bytes; use libp2p::core::{Negotiated, Endpoint, UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade::ProtocolName}; use libp2p::tokio_codec::Framed; use log::warn; use std::{collections::VecDeque, io, marker::PhantomData, vec::IntoIter as VecIntoIter}; use futures::{prelude::*, future, stream}; +use parity_codec::{Decode, Encode}; +use runtime_primitives::traits::Block as BlockT; use tokio_io::{AsyncRead, AsyncWrite}; use unsigned_varint::codec::UviBytes; @@ -28,7 +31,7 @@ use unsigned_varint::codec::UviBytes; /// /// Note that "a single protocol" here refers to `par` for example. However /// each protocol can have multiple different versions for networking purposes. -pub struct RegisteredProtocol { +pub struct RegisteredProtocol { /// Id of the protocol for API purposes. id: ProtocolId, /// Base name of the protocol as advertised on the network. @@ -38,10 +41,10 @@ pub struct RegisteredProtocol { /// Ordered in descending order so that the best comes first. supported_versions: Vec, /// Marker to pin the generic. - marker: PhantomData, + marker: PhantomData, } -impl RegisteredProtocol { +impl RegisteredProtocol { /// Creates a new `RegisteredProtocol`. The `custom_data` parameter will be /// passed inside the `RegisteredProtocolOutput`. pub fn new(protocol: impl Into, versions: &[u8]) @@ -64,7 +67,7 @@ impl RegisteredProtocol { } } -impl Clone for RegisteredProtocol { +impl Clone for RegisteredProtocol { fn clone(&self) -> Self { RegisteredProtocol { id: self.id.clone(), @@ -76,7 +79,7 @@ impl Clone for RegisteredProtocol { } /// Output of a `RegisteredProtocol` upgrade. -pub struct RegisteredProtocolSubstream { +pub struct RegisteredProtocolSubstream { /// If true, we are in the process of closing the sink. is_closing: bool, /// Whether the local node opened this substream (dialer), or we received this substream from @@ -94,10 +97,10 @@ pub struct RegisteredProtocolSubstream { /// unless the buffer empties then fills itself again. clogged_fuse: bool, /// Marker to pin the generic. - marker: PhantomData, + marker: PhantomData, } -impl RegisteredProtocolSubstream { +impl RegisteredProtocolSubstream { /// Returns the version of the protocol that was negotiated. pub fn protocol_version(&self) -> u8 { self.protocol_version @@ -121,43 +124,33 @@ impl RegisteredProtocolSubstream { } /// Sends a message to the substream. - pub fn send_message(&mut self, data: TMessage) - where TMessage: CustomMessage { + pub fn send_message(&mut self, data: Message) + where B: BlockT { if self.is_closing { return } - self.send_queue.push_back(data.into_bytes()); + self.send_queue.push_back(data.encode()); } } -/// Implemented on messages that can be sent or received on the network. -pub trait CustomMessage { - /// Turns a message into the raw bytes to send over the network. - fn into_bytes(self) -> Vec; - - /// Tries to parse `bytes` received from the network into a message. - fn from_bytes(bytes: &[u8]) -> Result - where Self: Sized; -} - /// Event produced by the `RegisteredProtocolSubstream`. #[derive(Debug, Clone)] -pub enum RegisteredProtocolEvent { +pub enum RegisteredProtocolEvent { /// Received a message from the remote. - Message(TMessage), + Message(Message), /// Diagnostic event indicating that the connection is clogged and we should avoid sending too /// many messages to it. Clogged { /// Copy of the messages that are within the buffer, for further diagnostic. - messages: Vec, + messages: Vec>, }, } -impl Stream for RegisteredProtocolSubstream -where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage { - type Item = RegisteredProtocolEvent; +impl Stream for RegisteredProtocolSubstream +where TSubstream: AsyncRead + AsyncWrite, B: BlockT { + type Item = RegisteredProtocolEvent; type Error = io::Error; fn poll(&mut self) -> Poll, Self::Error> { @@ -186,7 +179,7 @@ where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage { self.clogged_fuse = true; return Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged { messages: self.send_queue.iter() - .map(|m| CustomMessage::from_bytes(&m)) + .map(|m| Decode::decode(&mut &m[..]).ok_or(())) .filter_map(Result::ok) .collect(), }))) @@ -206,7 +199,7 @@ where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage { // Note that `inner` is wrapped in a `Fuse`, therefore we can poll it forever. match self.inner.poll()? { Async::Ready(Some(data)) => { - let message = ::from_bytes(&data) + let message = as Decode>::decode(&mut &data[..]).ok_or(()) .map_err(|()| { warn!(target: "sub-libp2p", "Couldn't decode packet sent by the remote: {:?}", data); io::ErrorKind::InvalidData @@ -224,7 +217,7 @@ where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage { } } -impl UpgradeInfo for RegisteredProtocol { +impl UpgradeInfo for RegisteredProtocol { type Info = RegisteredProtocolName; type InfoIter = VecIntoIter; @@ -259,10 +252,10 @@ impl ProtocolName for RegisteredProtocolName { } } -impl InboundUpgrade for RegisteredProtocol +impl InboundUpgrade for RegisteredProtocol where TSubstream: AsyncRead + AsyncWrite, { - type Output = RegisteredProtocolSubstream; + type Output = RegisteredProtocolSubstream; type Future = future::FutureResult; type Error = io::Error; @@ -290,7 +283,7 @@ where TSubstream: AsyncRead + AsyncWrite, } } -impl OutboundUpgrade for RegisteredProtocol +impl OutboundUpgrade for RegisteredProtocol where TSubstream: AsyncRead + AsyncWrite, { type Output = >::Output; diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index dcb4ea35d0..0d71955474 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -111,7 +111,7 @@ pub struct Protocol, H: ExHashT> { /// When asked for a proof of finality, we use this struct to build one. finality_proof_provider: Option>>, /// Handles opening the unique substream and sending and receiving raw messages. - behaviour: CustomProto, Substream>, + behaviour: CustomProto>, } /// A peer that we are connected to @@ -150,7 +150,7 @@ pub struct PeerInfo { } struct OnDemandIn<'a, B: BlockT> { - behaviour: &'a mut CustomProto, Substream>, + behaviour: &'a mut CustomProto>, peerset: peerset::PeersetHandle, } @@ -281,7 +281,7 @@ pub trait Context { /// Protocol context. struct ProtocolContext<'a, B: 'a + BlockT, H: 'a + ExHashT> { - behaviour: &'a mut CustomProto, Substream>, + behaviour: &'a mut CustomProto>, context_data: &'a mut ContextData, peerset_handle: &'a peerset::PeersetHandle, } @@ -289,7 +289,7 @@ struct ProtocolContext<'a, B: 'a + BlockT, H: 'a + ExHashT> { impl<'a, B: BlockT + 'a, H: 'a + ExHashT> ProtocolContext<'a, B, H> { fn new( context_data: &'a mut ContextData, - behaviour: &'a mut CustomProto, Substream>, + behaviour: &'a mut CustomProto>, peerset_handle: &'a peerset::PeersetHandle, ) -> Self { ProtocolContext { context_data, peerset_handle, behaviour } @@ -1479,7 +1479,7 @@ pub enum CustomMessageOutcome { } fn send_message( - behaviour: &mut CustomProto, Substream>, + behaviour: &mut CustomProto>, peers: &mut HashMap>, who: PeerId, mut message: Message, @@ -1500,7 +1500,7 @@ fn send_message( impl, H: ExHashT> NetworkBehaviour for Protocol { - type ProtocolsHandler = , Substream> as NetworkBehaviour>::ProtocolsHandler; + type ProtocolsHandler = > as NetworkBehaviour>::ProtocolsHandler; type OutEvent = CustomMessageOutcome; fn new_handler(&mut self) -> Self::ProtocolsHandler { diff --git a/substrate/core/network/src/protocol/message.rs b/substrate/core/network/src/protocol/message.rs index 7b9b684cd8..5aa47d9c26 100644 --- a/substrate/core/network/src/protocol/message.rs +++ b/substrate/core/network/src/protocol/message.rs @@ -125,7 +125,6 @@ pub struct RemoteReadResponse { /// Generic types. pub mod generic { - use crate::custom_proto::CustomMessage; use parity_codec::{Encode, Decode}; use runtime_primitives::Justification; use crate::config::Roles; @@ -210,18 +209,6 @@ pub mod generic { ChainSpecific(Vec), } - impl CustomMessage for Message - where Self: Decode + Encode - { - fn into_bytes(self) -> Vec { - self.encode() - } - - fn from_bytes(bytes: &[u8]) -> Result { - Decode::decode(&mut &bytes[..]).ok_or(()) - } - } - /// Status sent on connection. #[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)] pub struct Status {