mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-09 20:11:09 +00:00
Stop being generic over the message (#3186)
This commit is contained in:
committed by
Bastian Köcher
parent
9370a4a6b6
commit
8c919e031b
@@ -16,13 +16,15 @@
|
||||
|
||||
use crate::{DiscoveryNetBehaviour, config::ProtocolId};
|
||||
use crate::custom_proto::handler::{CustomProtoHandlerProto, CustomProtoHandlerOut, CustomProtoHandlerIn};
|
||||
use crate::custom_proto::upgrade::{CustomMessage, RegisteredProtocol};
|
||||
use crate::custom_proto::upgrade::RegisteredProtocol;
|
||||
use crate::protocol::message::Message;
|
||||
use fnv::FnvHashMap;
|
||||
use futures::prelude::*;
|
||||
use futures03::{compat::Compat, TryFutureExt as _, StreamExt as _, TryStreamExt as _};
|
||||
use libp2p::core::{ConnectedPoint, Multiaddr, PeerId};
|
||||
use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters};
|
||||
use log::{debug, error, trace, warn};
|
||||
use runtime_primitives::traits::Block as BlockT;
|
||||
use smallvec::SmallVec;
|
||||
use std::{borrow::Cow, collections::hash_map::Entry, cmp, error, marker::PhantomData, mem, pin::Pin};
|
||||
use std::time::{Duration, Instant};
|
||||
@@ -58,9 +60,9 @@ use tokio_io::{AsyncRead, AsyncWrite};
|
||||
/// Note that this "banning" system is not an actual ban. If a "banned" node tries to connect to
|
||||
/// us, we accept the connection. The "banning" system is only about delaying dialing attempts.
|
||||
///
|
||||
pub struct CustomProto<TMessage, TSubstream> {
|
||||
pub struct CustomProto<B: BlockT, TSubstream> {
|
||||
/// List of protocols to open with peers. Never modified.
|
||||
protocol: RegisteredProtocol<TMessage>,
|
||||
protocol: RegisteredProtocol<B>,
|
||||
|
||||
/// Receiver for instructions about who to connect to or disconnect from.
|
||||
peerset: peerset::Peerset,
|
||||
@@ -77,7 +79,7 @@ pub struct CustomProto<TMessage, TSubstream> {
|
||||
next_incoming_index: peerset::IncomingIndex,
|
||||
|
||||
/// Events to produce from `poll()`.
|
||||
events: SmallVec<[NetworkBehaviourAction<CustomProtoHandlerIn<TMessage>, CustomProtoOut<TMessage>>; 4]>,
|
||||
events: SmallVec<[NetworkBehaviourAction<CustomProtoHandlerIn<B>, CustomProtoOut<B>>; 4]>,
|
||||
|
||||
/// Marker to pin the generics.
|
||||
marker: PhantomData<TSubstream>,
|
||||
@@ -186,7 +188,7 @@ struct IncomingPeer {
|
||||
|
||||
/// Event that can be emitted by the `CustomProto`.
|
||||
#[derive(Debug)]
|
||||
pub enum CustomProtoOut<TMessage> {
|
||||
pub enum CustomProtoOut<B: BlockT> {
|
||||
/// Opened a custom protocol with the remote.
|
||||
CustomProtocolOpen {
|
||||
/// Version of the protocol that has been opened.
|
||||
@@ -210,7 +212,7 @@ pub enum CustomProtoOut<TMessage> {
|
||||
/// Id of the peer the message came from.
|
||||
peer_id: PeerId,
|
||||
/// Message that has been received.
|
||||
message: TMessage,
|
||||
message: Message<B>,
|
||||
},
|
||||
|
||||
/// The substream used by the protocol is pretty large. We should print avoid sending more
|
||||
@@ -219,11 +221,11 @@ pub enum CustomProtoOut<TMessage> {
|
||||
/// Id of the peer which is clogged.
|
||||
peer_id: PeerId,
|
||||
/// Copy of the messages that are within the buffer, for further diagnostic.
|
||||
messages: Vec<TMessage>,
|
||||
messages: Vec<Message<B>>,
|
||||
},
|
||||
}
|
||||
|
||||
impl<TMessage, TSubstream> CustomProto<TMessage, TSubstream> {
|
||||
impl<B: BlockT, TSubstream> CustomProto<B, TSubstream> {
|
||||
/// Creates a `CustomProtos`.
|
||||
pub fn new(
|
||||
protocol: impl Into<ProtocolId>,
|
||||
@@ -347,7 +349,8 @@ impl<TMessage, TSubstream> CustomProto<TMessage, TSubstream> {
|
||||
///
|
||||
/// Also note that even we have a valid open substream, it may in fact be already closed
|
||||
/// without us knowing, in which case the packet will not be received.
|
||||
pub fn send_packet(&mut self, target: &PeerId, message: TMessage) {
|
||||
pub fn send_packet(&mut self, target: &PeerId, message: Message<B>)
|
||||
where B: BlockT {
|
||||
if !self.is_open(target) {
|
||||
return;
|
||||
}
|
||||
@@ -603,7 +606,7 @@ impl<TMessage, TSubstream> CustomProto<TMessage, TSubstream> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<TMessage, TSubstream> DiscoveryNetBehaviour for CustomProto<TMessage, TSubstream> {
|
||||
impl<B: BlockT, TSubstream> DiscoveryNetBehaviour for CustomProto<B, TSubstream> {
|
||||
fn add_discovered_nodes(&mut self, peer_ids: impl Iterator<Item = PeerId>) {
|
||||
self.peerset.discovered(peer_ids.into_iter().map(|peer_id| {
|
||||
debug!(target: "sub-libp2p", "PSM <= Discovered({:?})", peer_id);
|
||||
@@ -612,13 +615,13 @@ impl<TMessage, TSubstream> DiscoveryNetBehaviour for CustomProto<TMessage, TSubs
|
||||
}
|
||||
}
|
||||
|
||||
impl<TMessage, TSubstream> NetworkBehaviour for CustomProto<TMessage, TSubstream>
|
||||
impl<B, TSubstream> NetworkBehaviour for CustomProto<B, TSubstream>
|
||||
where
|
||||
TSubstream: AsyncRead + AsyncWrite,
|
||||
TMessage: CustomMessage,
|
||||
B: BlockT,
|
||||
{
|
||||
type ProtocolsHandler = CustomProtoHandlerProto<TMessage, TSubstream>;
|
||||
type OutEvent = CustomProtoOut<TMessage>;
|
||||
type ProtocolsHandler = CustomProtoHandlerProto<B, TSubstream>;
|
||||
type OutEvent = CustomProtoOut<B>;
|
||||
|
||||
fn new_handler(&mut self) -> Self::ProtocolsHandler {
|
||||
CustomProtoHandlerProto::new(self.protocol.clone())
|
||||
@@ -816,7 +819,7 @@ where
|
||||
fn inject_node_event(
|
||||
&mut self,
|
||||
source: PeerId,
|
||||
event: CustomProtoHandlerOut<TMessage>,
|
||||
event: CustomProtoHandlerOut<B>,
|
||||
) {
|
||||
match event {
|
||||
CustomProtoHandlerOut::CustomProtocolClosed { reason } => {
|
||||
@@ -945,7 +948,7 @@ where
|
||||
_params: &mut impl PollParameters,
|
||||
) -> Async<
|
||||
NetworkBehaviourAction<
|
||||
CustomProtoHandlerIn<TMessage>,
|
||||
CustomProtoHandlerIn<B>,
|
||||
Self::OutEvent,
|
||||
>,
|
||||
> {
|
||||
|
||||
@@ -14,8 +14,8 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use crate::custom_proto::upgrade::{CustomMessage, RegisteredProtocol};
|
||||
use crate::custom_proto::upgrade::{RegisteredProtocolEvent, RegisteredProtocolSubstream};
|
||||
use crate::custom_proto::upgrade::{RegisteredProtocol, RegisteredProtocolEvent, RegisteredProtocolSubstream};
|
||||
use crate::protocol::message::Message;
|
||||
use futures::prelude::*;
|
||||
use futures03::{compat::Compat, TryFutureExt as _};
|
||||
use futures_timer::Delay;
|
||||
@@ -29,6 +29,7 @@ use libp2p::swarm::{
|
||||
SubstreamProtocol,
|
||||
};
|
||||
use log::{debug, error};
|
||||
use runtime_primitives::traits::Block as BlockT;
|
||||
use smallvec::{smallvec, SmallVec};
|
||||
use std::{borrow::Cow, error, fmt, io, marker::PhantomData, mem, time::Duration};
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
@@ -87,21 +88,21 @@ use tokio_io::{AsyncRead, AsyncWrite};
|
||||
/// We consider that we are now "closed" if the remote closes all the existing substreams.
|
||||
/// Re-opening it can then be performed by closing all active substream and re-opening one.
|
||||
///
|
||||
pub struct CustomProtoHandlerProto<TMessage, TSubstream> {
|
||||
pub struct CustomProtoHandlerProto<B, TSubstream> {
|
||||
/// Configuration for the protocol upgrade to negotiate.
|
||||
protocol: RegisteredProtocol<TMessage>,
|
||||
protocol: RegisteredProtocol<B>,
|
||||
|
||||
/// Marker to pin the generic type.
|
||||
marker: PhantomData<TSubstream>,
|
||||
}
|
||||
|
||||
impl<TMessage, TSubstream> CustomProtoHandlerProto<TMessage, TSubstream>
|
||||
impl<B, TSubstream> CustomProtoHandlerProto<B, TSubstream>
|
||||
where
|
||||
TSubstream: AsyncRead + AsyncWrite,
|
||||
TMessage: CustomMessage,
|
||||
B: BlockT,
|
||||
{
|
||||
/// Builds a new `CustomProtoHandlerProto`.
|
||||
pub fn new(protocol: RegisteredProtocol<TMessage>) -> Self {
|
||||
pub fn new(protocol: RegisteredProtocol<B>) -> Self {
|
||||
CustomProtoHandlerProto {
|
||||
protocol,
|
||||
marker: PhantomData,
|
||||
@@ -109,14 +110,14 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<TMessage, TSubstream> IntoProtocolsHandler for CustomProtoHandlerProto<TMessage, TSubstream>
|
||||
impl<B, TSubstream> IntoProtocolsHandler for CustomProtoHandlerProto<B, TSubstream>
|
||||
where
|
||||
TSubstream: AsyncRead + AsyncWrite,
|
||||
TMessage: CustomMessage,
|
||||
B: BlockT,
|
||||
{
|
||||
type Handler = CustomProtoHandler<TMessage, TSubstream>;
|
||||
type Handler = CustomProtoHandler<B, TSubstream>;
|
||||
|
||||
fn inbound_protocol(&self) -> RegisteredProtocol<TMessage> {
|
||||
fn inbound_protocol(&self) -> RegisteredProtocol<B> {
|
||||
self.protocol.clone()
|
||||
}
|
||||
|
||||
@@ -135,12 +136,12 @@ where
|
||||
}
|
||||
|
||||
/// The actual handler once the connection has been established.
|
||||
pub struct CustomProtoHandler<TMessage, TSubstream> {
|
||||
pub struct CustomProtoHandler<B: BlockT, TSubstream> {
|
||||
/// Configuration for the protocol upgrade to negotiate.
|
||||
protocol: RegisteredProtocol<TMessage>,
|
||||
protocol: RegisteredProtocol<B>,
|
||||
|
||||
/// State of the communications with the remote.
|
||||
state: ProtocolState<TMessage, TSubstream>,
|
||||
state: ProtocolState<B, TSubstream>,
|
||||
|
||||
/// Identifier of the node we're talking to. Used only for logging purposes and shouldn't have
|
||||
/// any influence on the behaviour.
|
||||
@@ -154,15 +155,15 @@ pub struct CustomProtoHandler<TMessage, TSubstream> {
|
||||
///
|
||||
/// This queue must only ever be modified to insert elements at the back, or remove the first
|
||||
/// element.
|
||||
events_queue: SmallVec<[ProtocolsHandlerEvent<RegisteredProtocol<TMessage>, (), CustomProtoHandlerOut<TMessage>>; 16]>,
|
||||
events_queue: SmallVec<[ProtocolsHandlerEvent<RegisteredProtocol<B>, (), CustomProtoHandlerOut<B>>; 16]>,
|
||||
}
|
||||
|
||||
/// State of the handler.
|
||||
enum ProtocolState<TMessage, TSubstream> {
|
||||
enum ProtocolState<B, TSubstream> {
|
||||
/// Waiting for the behaviour to tell the handler whether it is enabled or disabled.
|
||||
Init {
|
||||
/// List of substreams opened by the remote but that haven't been processed yet.
|
||||
substreams: SmallVec<[RegisteredProtocolSubstream<TMessage, TSubstream>; 6]>,
|
||||
substreams: SmallVec<[RegisteredProtocolSubstream<B, TSubstream>; 6]>,
|
||||
/// Deadline after which the initialization is abnormally long.
|
||||
init_deadline: Compat<Delay>,
|
||||
},
|
||||
@@ -178,9 +179,9 @@ enum ProtocolState<TMessage, TSubstream> {
|
||||
/// If we are in this state, we have sent a `CustomProtocolOpen` message to the outside.
|
||||
Normal {
|
||||
/// The substreams where bidirectional communications happen.
|
||||
substreams: SmallVec<[RegisteredProtocolSubstream<TMessage, TSubstream>; 4]>,
|
||||
substreams: SmallVec<[RegisteredProtocolSubstream<B, TSubstream>; 4]>,
|
||||
/// Contains substreams which are being shut down.
|
||||
shutdown: SmallVec<[RegisteredProtocolSubstream<TMessage, TSubstream>; 4]>,
|
||||
shutdown: SmallVec<[RegisteredProtocolSubstream<B, TSubstream>; 4]>,
|
||||
},
|
||||
|
||||
/// We are disabled. Contains substreams that are being closed.
|
||||
@@ -188,7 +189,7 @@ enum ProtocolState<TMessage, TSubstream> {
|
||||
/// outside or we have never sent any `CustomProtocolOpen` in the first place.
|
||||
Disabled {
|
||||
/// List of substreams to shut down.
|
||||
shutdown: SmallVec<[RegisteredProtocolSubstream<TMessage, TSubstream>; 6]>,
|
||||
shutdown: SmallVec<[RegisteredProtocolSubstream<B, TSubstream>; 6]>,
|
||||
|
||||
/// If true, we should reactivate the handler after all the substreams in `shutdown` have
|
||||
/// been closed.
|
||||
@@ -209,7 +210,7 @@ enum ProtocolState<TMessage, TSubstream> {
|
||||
|
||||
/// Event that can be received by a `CustomProtoHandler`.
|
||||
#[derive(Debug)]
|
||||
pub enum CustomProtoHandlerIn<TMessage> {
|
||||
pub enum CustomProtoHandlerIn<B: BlockT> {
|
||||
/// The node should start using custom protocols.
|
||||
Enable,
|
||||
|
||||
@@ -219,13 +220,13 @@ pub enum CustomProtoHandlerIn<TMessage> {
|
||||
/// Sends a message through a custom protocol substream.
|
||||
SendCustomMessage {
|
||||
/// The message to send.
|
||||
message: TMessage,
|
||||
message: Message<B>,
|
||||
},
|
||||
}
|
||||
|
||||
/// Event that can be emitted by a `CustomProtoHandler`.
|
||||
#[derive(Debug)]
|
||||
pub enum CustomProtoHandlerOut<TMessage> {
|
||||
pub enum CustomProtoHandlerOut<B: BlockT> {
|
||||
/// Opened a custom protocol with the remote.
|
||||
CustomProtocolOpen {
|
||||
/// Version of the protocol that has been opened.
|
||||
@@ -241,14 +242,14 @@ pub enum CustomProtoHandlerOut<TMessage> {
|
||||
/// Receives a message on a custom protocol substream.
|
||||
CustomMessage {
|
||||
/// Message that has been received.
|
||||
message: TMessage,
|
||||
message: Message<B>,
|
||||
},
|
||||
|
||||
/// A substream to the remote is clogged. The send buffer is very large, and we should print
|
||||
/// a diagnostic message and/or avoid sending more data.
|
||||
Clogged {
|
||||
/// Copy of the messages that are within the buffer, for further diagnostic.
|
||||
messages: Vec<TMessage>,
|
||||
messages: Vec<Message<B>>,
|
||||
},
|
||||
|
||||
/// An error has happened on the protocol level with this node.
|
||||
@@ -260,10 +261,10 @@ pub enum CustomProtoHandlerOut<TMessage> {
|
||||
},
|
||||
}
|
||||
|
||||
impl<TMessage, TSubstream> CustomProtoHandler<TMessage, TSubstream>
|
||||
impl<B, TSubstream> CustomProtoHandler<B, TSubstream>
|
||||
where
|
||||
TSubstream: AsyncRead + AsyncWrite,
|
||||
TMessage: CustomMessage,
|
||||
B: BlockT,
|
||||
{
|
||||
/// Enables the handler.
|
||||
fn enable(&mut self) {
|
||||
@@ -341,7 +342,7 @@ where
|
||||
/// Polls the state for events. Optionally returns an event to produce.
|
||||
#[must_use]
|
||||
fn poll_state(&mut self)
|
||||
-> Option<ProtocolsHandlerEvent<RegisteredProtocol<TMessage>, (), CustomProtoHandlerOut<TMessage>>> {
|
||||
-> Option<ProtocolsHandlerEvent<RegisteredProtocol<B>, (), CustomProtoHandlerOut<B>>> {
|
||||
match mem::replace(&mut self.state, ProtocolState::Poisoned) {
|
||||
ProtocolState::Poisoned => {
|
||||
error!(target: "sub-libp2p", "Handler with {:?} is in poisoned state",
|
||||
@@ -470,7 +471,7 @@ where
|
||||
/// Called by `inject_fully_negotiated_inbound` and `inject_fully_negotiated_outbound`.
|
||||
fn inject_fully_negotiated(
|
||||
&mut self,
|
||||
mut substream: RegisteredProtocolSubstream<TMessage, TSubstream>
|
||||
mut substream: RegisteredProtocolSubstream<B, TSubstream>
|
||||
) {
|
||||
self.state = match mem::replace(&mut self.state, ProtocolState::Poisoned) {
|
||||
ProtocolState::Poisoned => {
|
||||
@@ -515,7 +516,7 @@ where
|
||||
}
|
||||
|
||||
/// Sends a message to the remote.
|
||||
fn send_message(&mut self, message: TMessage) {
|
||||
fn send_message(&mut self, message: Message<B>) {
|
||||
match self.state {
|
||||
ProtocolState::Normal { ref mut substreams, .. } =>
|
||||
substreams[0].send_message(message),
|
||||
@@ -526,14 +527,14 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<TMessage, TSubstream> ProtocolsHandler for CustomProtoHandler<TMessage, TSubstream>
|
||||
where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage {
|
||||
type InEvent = CustomProtoHandlerIn<TMessage>;
|
||||
type OutEvent = CustomProtoHandlerOut<TMessage>;
|
||||
impl<B, TSubstream> ProtocolsHandler for CustomProtoHandler<B, TSubstream>
|
||||
where TSubstream: AsyncRead + AsyncWrite, B: BlockT {
|
||||
type InEvent = CustomProtoHandlerIn<B>;
|
||||
type OutEvent = CustomProtoHandlerOut<B>;
|
||||
type Substream = TSubstream;
|
||||
type Error = ConnectionKillError;
|
||||
type InboundProtocol = RegisteredProtocol<TMessage>;
|
||||
type OutboundProtocol = RegisteredProtocol<TMessage>;
|
||||
type InboundProtocol = RegisteredProtocol<B>;
|
||||
type OutboundProtocol = RegisteredProtocol<B>;
|
||||
type OutboundOpenInfo = ();
|
||||
|
||||
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
|
||||
@@ -555,7 +556,7 @@ where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage {
|
||||
self.inject_fully_negotiated(proto);
|
||||
}
|
||||
|
||||
fn inject_event(&mut self, message: CustomProtoHandlerIn<TMessage>) {
|
||||
fn inject_event(&mut self, message: CustomProtoHandlerIn<B>) {
|
||||
match message {
|
||||
CustomProtoHandlerIn::Disable => self.disable(),
|
||||
CustomProtoHandlerIn::Enable => self.enable(),
|
||||
@@ -612,7 +613,7 @@ where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage {
|
||||
}
|
||||
}
|
||||
|
||||
impl<TMessage, TSubstream> fmt::Debug for CustomProtoHandler<TMessage, TSubstream>
|
||||
impl<B: BlockT, TSubstream> fmt::Debug for CustomProtoHandler<B, TSubstream>
|
||||
where
|
||||
TSubstream: AsyncRead + AsyncWrite,
|
||||
{
|
||||
@@ -624,9 +625,9 @@ where
|
||||
|
||||
/// Given a list of substreams, tries to shut them down. The substreams that have been successfully
|
||||
/// shut down are removed from the list.
|
||||
fn shutdown_list<TMessage, TSubstream>
|
||||
(list: &mut SmallVec<impl smallvec::Array<Item = RegisteredProtocolSubstream<TMessage, TSubstream>>>)
|
||||
where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage {
|
||||
fn shutdown_list<B, TSubstream>
|
||||
(list: &mut SmallVec<impl smallvec::Array<Item = RegisteredProtocolSubstream<B, TSubstream>>>)
|
||||
where TSubstream: AsyncRead + AsyncWrite, B: BlockT {
|
||||
'outer: for n in (0..list.len()).rev() {
|
||||
let mut substream = list.swap_remove(n);
|
||||
loop {
|
||||
|
||||
@@ -15,7 +15,6 @@
|
||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
pub use self::behaviour::{CustomProto, CustomProtoOut};
|
||||
pub use self::upgrade::CustomMessage;
|
||||
|
||||
mod behaviour;
|
||||
mod handler;
|
||||
|
||||
@@ -25,15 +25,15 @@ use libp2p::{PeerId, Multiaddr, Transport};
|
||||
use rand::seq::SliceRandom;
|
||||
use std::{io, time::Duration, time::Instant};
|
||||
use test_client::runtime::Block;
|
||||
use crate::message::{Message as MessageAlias, generic::Message};
|
||||
use crate::custom_proto::{CustomProto, CustomProtoOut, CustomMessage};
|
||||
use crate::message::generic::Message;
|
||||
use crate::custom_proto::{CustomProto, CustomProtoOut};
|
||||
|
||||
/// Builds two nodes that have each other as bootstrap nodes.
|
||||
/// This is to be used only for testing, and a panic will happen if something goes wrong.
|
||||
fn build_nodes<T: CustomMessage + Send + 'static>()
|
||||
fn build_nodes()
|
||||
-> (
|
||||
Swarm<Boxed<(PeerId, StreamMuxerBox), io::Error>, CustomProtoWithAddr<T>>,
|
||||
Swarm<Boxed<(PeerId, StreamMuxerBox), io::Error>, CustomProtoWithAddr<T>>
|
||||
Swarm<Boxed<(PeerId, StreamMuxerBox), io::Error>, CustomProtoWithAddr>,
|
||||
Swarm<Boxed<(PeerId, StreamMuxerBox), io::Error>, CustomProtoWithAddr>
|
||||
) {
|
||||
let mut out = Vec::with_capacity(2);
|
||||
|
||||
@@ -100,29 +100,29 @@ fn build_nodes<T: CustomMessage + Send + 'static>()
|
||||
}
|
||||
|
||||
/// Wraps around the `CustomBehaviour` network behaviour, and adds hardcoded node addresses to it.
|
||||
struct CustomProtoWithAddr<T: CustomMessage + Send + 'static> {
|
||||
inner: CustomProto<T, Substream<StreamMuxerBox>>,
|
||||
struct CustomProtoWithAddr {
|
||||
inner: CustomProto<Block, Substream<StreamMuxerBox>>,
|
||||
addrs: Vec<(PeerId, Multiaddr)>,
|
||||
}
|
||||
|
||||
impl<T: CustomMessage + Send + 'static> std::ops::Deref for CustomProtoWithAddr<T> {
|
||||
type Target = CustomProto<T, Substream<StreamMuxerBox>>;
|
||||
impl std::ops::Deref for CustomProtoWithAddr {
|
||||
type Target = CustomProto<Block, Substream<StreamMuxerBox>>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.inner
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: CustomMessage + Send + 'static> std::ops::DerefMut for CustomProtoWithAddr<T> {
|
||||
impl std::ops::DerefMut for CustomProtoWithAddr {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.inner
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: CustomMessage + Send + 'static> NetworkBehaviour for CustomProtoWithAddr<T> {
|
||||
impl NetworkBehaviour for CustomProtoWithAddr {
|
||||
type ProtocolsHandler =
|
||||
<CustomProto<T, Substream<StreamMuxerBox>> as NetworkBehaviour>::ProtocolsHandler;
|
||||
type OutEvent = <CustomProto<T, Substream<StreamMuxerBox>> as NetworkBehaviour>::OutEvent;
|
||||
<CustomProto<Block, Substream<StreamMuxerBox>> as NetworkBehaviour>::ProtocolsHandler;
|
||||
type OutEvent = <CustomProto<Block, Substream<StreamMuxerBox>> as NetworkBehaviour>::OutEvent;
|
||||
|
||||
fn new_handler(&mut self) -> Self::ProtocolsHandler {
|
||||
self.inner.new_handler()
|
||||
@@ -200,7 +200,7 @@ fn two_nodes_transfer_lots_of_packets() {
|
||||
// substreams allowed by the multiplexer.
|
||||
const NUM_PACKETS: u32 = 5000;
|
||||
|
||||
let (mut service1, mut service2) = build_nodes::<MessageAlias<Block>>();
|
||||
let (mut service1, mut service2) = build_nodes();
|
||||
|
||||
let fut1 = future::poll_fn(move || -> io::Result<_> {
|
||||
loop {
|
||||
@@ -241,7 +241,7 @@ fn two_nodes_transfer_lots_of_packets() {
|
||||
|
||||
#[test]
|
||||
fn basic_two_nodes_requests_in_parallel() {
|
||||
let (mut service1, mut service2) = build_nodes::<MessageAlias<Block>>();
|
||||
let (mut service1, mut service2) = build_nodes();
|
||||
|
||||
// Generate random messages with or without a request id.
|
||||
let mut to_send = {
|
||||
@@ -296,7 +296,7 @@ fn reconnect_after_disconnect() {
|
||||
// We connect two nodes together, then force a disconnect (through the API of the `Service`),
|
||||
// check that the disconnect worked, and finally check whether they successfully reconnect.
|
||||
|
||||
let (mut service1, mut service2) = build_nodes::<MessageAlias<Block>>();
|
||||
let (mut service1, mut service2) = build_nodes();
|
||||
|
||||
// We use the `current_thread` runtime because it doesn't require us to have `'static` futures.
|
||||
let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap();
|
||||
|
||||
@@ -15,12 +15,15 @@
|
||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use crate::config::ProtocolId;
|
||||
use crate::protocol::message::Message;
|
||||
use bytes::Bytes;
|
||||
use libp2p::core::{Negotiated, Endpoint, UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade::ProtocolName};
|
||||
use libp2p::tokio_codec::Framed;
|
||||
use log::warn;
|
||||
use std::{collections::VecDeque, io, marker::PhantomData, vec::IntoIter as VecIntoIter};
|
||||
use futures::{prelude::*, future, stream};
|
||||
use parity_codec::{Decode, Encode};
|
||||
use runtime_primitives::traits::Block as BlockT;
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use unsigned_varint::codec::UviBytes;
|
||||
|
||||
@@ -28,7 +31,7 @@ use unsigned_varint::codec::UviBytes;
|
||||
///
|
||||
/// Note that "a single protocol" here refers to `par` for example. However
|
||||
/// each protocol can have multiple different versions for networking purposes.
|
||||
pub struct RegisteredProtocol<TMessage> {
|
||||
pub struct RegisteredProtocol<B> {
|
||||
/// Id of the protocol for API purposes.
|
||||
id: ProtocolId,
|
||||
/// Base name of the protocol as advertised on the network.
|
||||
@@ -38,10 +41,10 @@ pub struct RegisteredProtocol<TMessage> {
|
||||
/// Ordered in descending order so that the best comes first.
|
||||
supported_versions: Vec<u8>,
|
||||
/// Marker to pin the generic.
|
||||
marker: PhantomData<TMessage>,
|
||||
marker: PhantomData<B>,
|
||||
}
|
||||
|
||||
impl<TMessage> RegisteredProtocol<TMessage> {
|
||||
impl<B> RegisteredProtocol<B> {
|
||||
/// Creates a new `RegisteredProtocol`. The `custom_data` parameter will be
|
||||
/// passed inside the `RegisteredProtocolOutput`.
|
||||
pub fn new(protocol: impl Into<ProtocolId>, versions: &[u8])
|
||||
@@ -64,7 +67,7 @@ impl<TMessage> RegisteredProtocol<TMessage> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<TMessage> Clone for RegisteredProtocol<TMessage> {
|
||||
impl<B> Clone for RegisteredProtocol<B> {
|
||||
fn clone(&self) -> Self {
|
||||
RegisteredProtocol {
|
||||
id: self.id.clone(),
|
||||
@@ -76,7 +79,7 @@ impl<TMessage> Clone for RegisteredProtocol<TMessage> {
|
||||
}
|
||||
|
||||
/// Output of a `RegisteredProtocol` upgrade.
|
||||
pub struct RegisteredProtocolSubstream<TMessage, TSubstream> {
|
||||
pub struct RegisteredProtocolSubstream<B, TSubstream> {
|
||||
/// If true, we are in the process of closing the sink.
|
||||
is_closing: bool,
|
||||
/// Whether the local node opened this substream (dialer), or we received this substream from
|
||||
@@ -94,10 +97,10 @@ pub struct RegisteredProtocolSubstream<TMessage, TSubstream> {
|
||||
/// unless the buffer empties then fills itself again.
|
||||
clogged_fuse: bool,
|
||||
/// Marker to pin the generic.
|
||||
marker: PhantomData<TMessage>,
|
||||
marker: PhantomData<B>,
|
||||
}
|
||||
|
||||
impl<TMessage, TSubstream> RegisteredProtocolSubstream<TMessage, TSubstream> {
|
||||
impl<B, TSubstream> RegisteredProtocolSubstream<B, TSubstream> {
|
||||
/// Returns the version of the protocol that was negotiated.
|
||||
pub fn protocol_version(&self) -> u8 {
|
||||
self.protocol_version
|
||||
@@ -121,43 +124,33 @@ impl<TMessage, TSubstream> RegisteredProtocolSubstream<TMessage, TSubstream> {
|
||||
}
|
||||
|
||||
/// Sends a message to the substream.
|
||||
pub fn send_message(&mut self, data: TMessage)
|
||||
where TMessage: CustomMessage {
|
||||
pub fn send_message(&mut self, data: Message<B>)
|
||||
where B: BlockT {
|
||||
if self.is_closing {
|
||||
return
|
||||
}
|
||||
|
||||
self.send_queue.push_back(data.into_bytes());
|
||||
self.send_queue.push_back(data.encode());
|
||||
}
|
||||
}
|
||||
|
||||
/// Implemented on messages that can be sent or received on the network.
|
||||
pub trait CustomMessage {
|
||||
/// Turns a message into the raw bytes to send over the network.
|
||||
fn into_bytes(self) -> Vec<u8>;
|
||||
|
||||
/// Tries to parse `bytes` received from the network into a message.
|
||||
fn from_bytes(bytes: &[u8]) -> Result<Self, ()>
|
||||
where Self: Sized;
|
||||
}
|
||||
|
||||
/// Event produced by the `RegisteredProtocolSubstream`.
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum RegisteredProtocolEvent<TMessage> {
|
||||
pub enum RegisteredProtocolEvent<B: BlockT> {
|
||||
/// Received a message from the remote.
|
||||
Message(TMessage),
|
||||
Message(Message<B>),
|
||||
|
||||
/// Diagnostic event indicating that the connection is clogged and we should avoid sending too
|
||||
/// many messages to it.
|
||||
Clogged {
|
||||
/// Copy of the messages that are within the buffer, for further diagnostic.
|
||||
messages: Vec<TMessage>,
|
||||
messages: Vec<Message<B>>,
|
||||
},
|
||||
}
|
||||
|
||||
impl<TMessage, TSubstream> Stream for RegisteredProtocolSubstream<TMessage, TSubstream>
|
||||
where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage {
|
||||
type Item = RegisteredProtocolEvent<TMessage>;
|
||||
impl<B, TSubstream> Stream for RegisteredProtocolSubstream<B, TSubstream>
|
||||
where TSubstream: AsyncRead + AsyncWrite, B: BlockT {
|
||||
type Item = RegisteredProtocolEvent<B>;
|
||||
type Error = io::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
@@ -186,7 +179,7 @@ where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage {
|
||||
self.clogged_fuse = true;
|
||||
return Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged {
|
||||
messages: self.send_queue.iter()
|
||||
.map(|m| CustomMessage::from_bytes(&m))
|
||||
.map(|m| Decode::decode(&mut &m[..]).ok_or(()))
|
||||
.filter_map(Result::ok)
|
||||
.collect(),
|
||||
})))
|
||||
@@ -206,7 +199,7 @@ where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage {
|
||||
// Note that `inner` is wrapped in a `Fuse`, therefore we can poll it forever.
|
||||
match self.inner.poll()? {
|
||||
Async::Ready(Some(data)) => {
|
||||
let message = <TMessage as CustomMessage>::from_bytes(&data)
|
||||
let message = <Message<B> as Decode>::decode(&mut &data[..]).ok_or(())
|
||||
.map_err(|()| {
|
||||
warn!(target: "sub-libp2p", "Couldn't decode packet sent by the remote: {:?}", data);
|
||||
io::ErrorKind::InvalidData
|
||||
@@ -224,7 +217,7 @@ where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage {
|
||||
}
|
||||
}
|
||||
|
||||
impl<TMessage> UpgradeInfo for RegisteredProtocol<TMessage> {
|
||||
impl<B> UpgradeInfo for RegisteredProtocol<B> {
|
||||
type Info = RegisteredProtocolName;
|
||||
type InfoIter = VecIntoIter<Self::Info>;
|
||||
|
||||
@@ -259,10 +252,10 @@ impl ProtocolName for RegisteredProtocolName {
|
||||
}
|
||||
}
|
||||
|
||||
impl<TMessage, TSubstream> InboundUpgrade<TSubstream> for RegisteredProtocol<TMessage>
|
||||
impl<B, TSubstream> InboundUpgrade<TSubstream> for RegisteredProtocol<B>
|
||||
where TSubstream: AsyncRead + AsyncWrite,
|
||||
{
|
||||
type Output = RegisteredProtocolSubstream<TMessage, TSubstream>;
|
||||
type Output = RegisteredProtocolSubstream<B, TSubstream>;
|
||||
type Future = future::FutureResult<Self::Output, io::Error>;
|
||||
type Error = io::Error;
|
||||
|
||||
@@ -290,7 +283,7 @@ where TSubstream: AsyncRead + AsyncWrite,
|
||||
}
|
||||
}
|
||||
|
||||
impl<TMessage, TSubstream> OutboundUpgrade<TSubstream> for RegisteredProtocol<TMessage>
|
||||
impl<B, TSubstream> OutboundUpgrade<TSubstream> for RegisteredProtocol<B>
|
||||
where TSubstream: AsyncRead + AsyncWrite,
|
||||
{
|
||||
type Output = <Self as InboundUpgrade<TSubstream>>::Output;
|
||||
|
||||
@@ -111,7 +111,7 @@ pub struct Protocol<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> {
|
||||
/// When asked for a proof of finality, we use this struct to build one.
|
||||
finality_proof_provider: Option<Arc<dyn FinalityProofProvider<B>>>,
|
||||
/// Handles opening the unique substream and sending and receiving raw messages.
|
||||
behaviour: CustomProto<Message<B>, Substream<StreamMuxerBox>>,
|
||||
behaviour: CustomProto<B, Substream<StreamMuxerBox>>,
|
||||
}
|
||||
|
||||
/// A peer that we are connected to
|
||||
@@ -150,7 +150,7 @@ pub struct PeerInfo<B: BlockT> {
|
||||
}
|
||||
|
||||
struct OnDemandIn<'a, B: BlockT> {
|
||||
behaviour: &'a mut CustomProto<Message<B>, Substream<StreamMuxerBox>>,
|
||||
behaviour: &'a mut CustomProto<B, Substream<StreamMuxerBox>>,
|
||||
peerset: peerset::PeersetHandle,
|
||||
}
|
||||
|
||||
@@ -281,7 +281,7 @@ pub trait Context<B: BlockT> {
|
||||
|
||||
/// Protocol context.
|
||||
struct ProtocolContext<'a, B: 'a + BlockT, H: 'a + ExHashT> {
|
||||
behaviour: &'a mut CustomProto<Message<B>, Substream<StreamMuxerBox>>,
|
||||
behaviour: &'a mut CustomProto<B, Substream<StreamMuxerBox>>,
|
||||
context_data: &'a mut ContextData<B, H>,
|
||||
peerset_handle: &'a peerset::PeersetHandle,
|
||||
}
|
||||
@@ -289,7 +289,7 @@ struct ProtocolContext<'a, B: 'a + BlockT, H: 'a + ExHashT> {
|
||||
impl<'a, B: BlockT + 'a, H: 'a + ExHashT> ProtocolContext<'a, B, H> {
|
||||
fn new(
|
||||
context_data: &'a mut ContextData<B, H>,
|
||||
behaviour: &'a mut CustomProto<Message<B>, Substream<StreamMuxerBox>>,
|
||||
behaviour: &'a mut CustomProto<B, Substream<StreamMuxerBox>>,
|
||||
peerset_handle: &'a peerset::PeersetHandle,
|
||||
) -> Self {
|
||||
ProtocolContext { context_data, peerset_handle, behaviour }
|
||||
@@ -1479,7 +1479,7 @@ pub enum CustomMessageOutcome<B: BlockT> {
|
||||
}
|
||||
|
||||
fn send_message<B: BlockT, H: ExHashT>(
|
||||
behaviour: &mut CustomProto<Message<B>, Substream<StreamMuxerBox>>,
|
||||
behaviour: &mut CustomProto<B, Substream<StreamMuxerBox>>,
|
||||
peers: &mut HashMap<PeerId, Peer<B, H>>,
|
||||
who: PeerId,
|
||||
mut message: Message<B>,
|
||||
@@ -1500,7 +1500,7 @@ fn send_message<B: BlockT, H: ExHashT>(
|
||||
|
||||
impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> NetworkBehaviour for
|
||||
Protocol<B, S, H> {
|
||||
type ProtocolsHandler = <CustomProto<Message<B>, Substream<StreamMuxerBox>> as NetworkBehaviour>::ProtocolsHandler;
|
||||
type ProtocolsHandler = <CustomProto<B, Substream<StreamMuxerBox>> as NetworkBehaviour>::ProtocolsHandler;
|
||||
type OutEvent = CustomMessageOutcome<B>;
|
||||
|
||||
fn new_handler(&mut self) -> Self::ProtocolsHandler {
|
||||
|
||||
@@ -125,7 +125,6 @@ pub struct RemoteReadResponse {
|
||||
|
||||
/// Generic types.
|
||||
pub mod generic {
|
||||
use crate::custom_proto::CustomMessage;
|
||||
use parity_codec::{Encode, Decode};
|
||||
use runtime_primitives::Justification;
|
||||
use crate::config::Roles;
|
||||
@@ -210,18 +209,6 @@ pub mod generic {
|
||||
ChainSpecific(Vec<u8>),
|
||||
}
|
||||
|
||||
impl<Header, Hash, Number, Extrinsic> CustomMessage for Message<Header, Hash, Number, Extrinsic>
|
||||
where Self: Decode + Encode
|
||||
{
|
||||
fn into_bytes(self) -> Vec<u8> {
|
||||
self.encode()
|
||||
}
|
||||
|
||||
fn from_bytes(bytes: &[u8]) -> Result<Self, ()> {
|
||||
Decode::decode(&mut &bytes[..]).ok_or(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Status sent on connection.
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
|
||||
pub struct Status<Hash, Number> {
|
||||
|
||||
Reference in New Issue
Block a user