Make network-libp2p's Service generic over the message (#1708)

* Make network-libp2p's Service generic over the message

* Apply suggestions from code review

Co-Authored-By: tomaka <pierre.krieger1708@gmail.com>

* Fix warning
This commit is contained in:
Pierre Krieger
2019-02-12 15:36:15 +01:00
committed by Bastian Köcher
parent 1f05a47cdb
commit 9e999cdd81
13 changed files with 262 additions and 224 deletions
+23 -24
View File
@@ -16,7 +16,6 @@
use crate::custom_proto::{CustomProtos, CustomProtosOut, RegisteredProtocols};
use crate::{NetworkConfiguration, ProtocolId};
use bytes::Bytes;
use futures::prelude::*;
use libp2p::NetworkBehaviour;
use libp2p::core::{Multiaddr, PeerId, ProtocolsHandler, PublicKey};
@@ -33,12 +32,12 @@ use void;
/// General behaviour of the network.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "BehaviourOut", poll_method = "poll")]
pub struct Behaviour<TSubstream> {
#[behaviour(out_event = "BehaviourOut<TMessage>", poll_method = "poll")]
pub struct Behaviour<TMessage, TSubstream> {
/// Periodically ping nodes, and close the connection if it's unresponsive.
ping: Ping<TSubstream>,
/// Custom protocols (dot, bbq, sub, etc.).
custom_protocols: CustomProtos<TSubstream>,
custom_protocols: CustomProtos<TMessage, TSubstream>,
/// Discovers nodes of the network. Defined below.
discovery: DiscoveryBehaviour<TSubstream>,
/// Periodically identifies the remote and responds to incoming requests.
@@ -46,13 +45,13 @@ pub struct Behaviour<TSubstream> {
/// Queue of events to produce for the outside.
#[behaviour(ignore)]
events: Vec<BehaviourOut>,
events: Vec<BehaviourOut<TMessage>>,
}
impl<TSubstream> Behaviour<TSubstream> {
impl<TMessage, TSubstream> Behaviour<TMessage, TSubstream> {
/// Builds a new `Behaviour`.
// TODO: redundancy between config and local_public_key (https://github.com/libp2p/rust-libp2p/issues/745)
pub fn new(config: &NetworkConfiguration, local_public_key: PublicKey, protocols: RegisteredProtocols) -> Self {
pub fn new(config: &NetworkConfiguration, local_public_key: PublicKey, protocols: RegisteredProtocols<TMessage>) -> Self {
let identify = {
let proto_version = "/substrate/1.0".to_string();
let user_agent = format!("{} ({})", config.client_version, config.node_name);
@@ -78,7 +77,7 @@ impl<TSubstream> Behaviour<TSubstream> {
/// Also note that even we have a valid open substream, it may in fact be already closed
/// without us knowing, in which case the packet will not be received.
#[inline]
pub fn send_custom_message(&mut self, target: &PeerId, protocol_id: ProtocolId, data: impl Into<Bytes>) {
pub fn send_custom_message(&mut self, target: &PeerId, protocol_id: ProtocolId, data: TMessage) {
self.custom_protocols.send_packet(target, protocol_id, data)
}
@@ -147,7 +146,7 @@ impl<TSubstream> Behaviour<TSubstream> {
/// Event that can be emitted by the behaviour.
#[derive(Debug)]
pub enum BehaviourOut {
pub enum BehaviourOut<TMessage> {
/// Opened a custom protocol with the remote.
CustomProtocolOpen {
/// Identifier of the protocol.
@@ -176,8 +175,8 @@ pub enum BehaviourOut {
peer_id: PeerId,
/// Protocol which generated the message.
protocol_id: ProtocolId,
/// Data that has been received.
data: Bytes,
/// Message that has been received.
message: TMessage,
},
/// A substream with a remote is clogged. We should avoid sending more data to it if possible.
@@ -187,7 +186,7 @@ pub enum BehaviourOut {
/// Protocol which generated the message.
protocol_id: ProtocolId,
/// Copy of the messages that are within the buffer, for further diagnostic.
messages: Vec<Bytes>,
messages: Vec<TMessage>,
},
/// We have obtained debug information from a peer.
@@ -199,8 +198,8 @@ pub enum BehaviourOut {
},
}
impl From<CustomProtosOut> for BehaviourOut {
fn from(other: CustomProtosOut) -> BehaviourOut {
impl<TMessage> From<CustomProtosOut<TMessage>> for BehaviourOut<TMessage> {
fn from(other: CustomProtosOut<TMessage>) -> BehaviourOut<TMessage> {
match other {
CustomProtosOut::CustomProtocolOpen { protocol_id, version, peer_id, endpoint } => {
BehaviourOut::CustomProtocolOpen { protocol_id, version, peer_id, endpoint }
@@ -208,8 +207,8 @@ impl From<CustomProtosOut> for BehaviourOut {
CustomProtosOut::CustomProtocolClosed { protocol_id, peer_id, result } => {
BehaviourOut::CustomProtocolClosed { protocol_id, peer_id, result }
}
CustomProtosOut::CustomMessage { protocol_id, peer_id, data } => {
BehaviourOut::CustomMessage { protocol_id, peer_id, data }
CustomProtosOut::CustomMessage { protocol_id, peer_id, message } => {
BehaviourOut::CustomMessage { protocol_id, peer_id, message }
}
CustomProtosOut::Clogged { protocol_id, peer_id, messages } => {
BehaviourOut::Clogged { protocol_id, peer_id, messages }
@@ -218,19 +217,19 @@ impl From<CustomProtosOut> for BehaviourOut {
}
}
impl<TSubstream> NetworkBehaviourEventProcess<void::Void> for Behaviour<TSubstream> {
impl<TMessage, TSubstream> NetworkBehaviourEventProcess<void::Void> for Behaviour<TMessage, TSubstream> {
fn inject_event(&mut self, event: void::Void) {
void::unreachable(event)
}
}
impl<TSubstream> NetworkBehaviourEventProcess<CustomProtosOut> for Behaviour<TSubstream> {
fn inject_event(&mut self, event: CustomProtosOut) {
impl<TMessage, TSubstream> NetworkBehaviourEventProcess<CustomProtosOut<TMessage>> for Behaviour<TMessage, TSubstream> {
fn inject_event(&mut self, event: CustomProtosOut<TMessage>) {
self.events.push(event.into());
}
}
impl<TSubstream> NetworkBehaviourEventProcess<IdentifyEvent> for Behaviour<TSubstream> {
impl<TMessage, TSubstream> NetworkBehaviourEventProcess<IdentifyEvent> for Behaviour<TMessage, TSubstream> {
fn inject_event(&mut self, event: IdentifyEvent) {
match event {
IdentifyEvent::Identified { peer_id, mut info, .. } => {
@@ -260,7 +259,7 @@ impl<TSubstream> NetworkBehaviourEventProcess<IdentifyEvent> for Behaviour<TSubs
}
}
impl<TSubstream> NetworkBehaviourEventProcess<KademliaOut> for Behaviour<TSubstream> {
impl<TMessage, TSubstream> NetworkBehaviourEventProcess<KademliaOut> for Behaviour<TMessage, TSubstream> {
fn inject_event(&mut self, out: KademliaOut) {
match out {
KademliaOut::Discovered { peer_id, addresses, ty } => {
@@ -282,7 +281,7 @@ impl<TSubstream> NetworkBehaviourEventProcess<KademliaOut> for Behaviour<TSubstr
}
}
impl<TSubstream> NetworkBehaviourEventProcess<PingEvent> for Behaviour<TSubstream> {
impl<TMessage, TSubstream> NetworkBehaviourEventProcess<PingEvent> for Behaviour<TMessage, TSubstream> {
fn inject_event(&mut self, event: PingEvent) {
match event {
PingEvent::PingSuccess { peer, time } => {
@@ -292,8 +291,8 @@ impl<TSubstream> NetworkBehaviourEventProcess<PingEvent> for Behaviour<TSubstrea
}
}
impl<TSubstream> Behaviour<TSubstream> {
fn poll<TEv>(&mut self) -> Async<NetworkBehaviourAction<TEv, BehaviourOut>> {
impl<TMessage, TSubstream> Behaviour<TMessage, TSubstream> {
fn poll<TEv>(&mut self) -> Async<NetworkBehaviourAction<TEv, BehaviourOut<TMessage>>> {
if !self.events.is_empty() {
return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0)))
}
@@ -16,10 +16,9 @@
use crate::custom_proto::handler::{CustomProtosHandler, CustomProtosHandlerOut, CustomProtosHandlerIn};
use crate::custom_proto::topology::NetTopology;
use crate::custom_proto::upgrade::RegisteredProtocols;
use crate::custom_proto::upgrade::{CustomMessage, RegisteredProtocols};
use crate::{NetworkConfiguration, NonReservedPeerMode, ProtocolId};
use crate::parse_str_addr;
use bytes::Bytes;
use fnv::{FnvHashMap, FnvHashSet};
use futures::prelude::*;
use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters};
@@ -36,9 +35,9 @@ const NODES_FILE: &str = "nodes.json";
const PEER_DISABLE_DURATION: Duration = Duration::from_secs(5 * 60);
/// Network behaviour that handles opening substreams for custom protocols with other nodes.
pub struct CustomProtos<TSubstream> {
pub struct CustomProtos<TMessage, TSubstream> {
/// List of protocols to open with peers. Never modified.
registered_protocols: RegisteredProtocols,
registered_protocols: RegisteredProtocols<TMessage>,
/// Topology of the network.
topology: NetTopology,
@@ -77,7 +76,7 @@ pub struct CustomProtos<TSubstream> {
next_connect_to_nodes: Delay,
/// Events to produce from `poll()`.
events: SmallVec<[NetworkBehaviourAction<CustomProtosHandlerIn, CustomProtosOut>; 4]>,
events: SmallVec<[NetworkBehaviourAction<CustomProtosHandlerIn<TMessage>, CustomProtosOut<TMessage>>; 4]>,
/// Marker to pin the generics.
marker: PhantomData<TSubstream>,
@@ -85,7 +84,7 @@ pub struct CustomProtos<TSubstream> {
/// Event that can be emitted by the `CustomProtos`.
#[derive(Debug)]
pub enum CustomProtosOut {
pub enum CustomProtosOut<TMessage> {
/// Opened a custom protocol with the remote.
CustomProtocolOpen {
/// Identifier of the protocol.
@@ -114,25 +113,25 @@ pub enum CustomProtosOut {
peer_id: PeerId,
/// Protocol which generated the message.
protocol_id: ProtocolId,
/// Data that has been received.
data: Bytes,
/// Message that has been received.
message: TMessage,
},
/// The substream used by the protocol is pretty large. We should print avoid sending more
/// data on it if possible.
/// messages on it if possible.
Clogged {
/// Id of the peer which is clogged.
peer_id: PeerId,
/// Protocol which has a problem.
protocol_id: ProtocolId,
/// Copy of the messages that are within the buffer, for further diagnostic.
messages: Vec<Bytes>,
messages: Vec<TMessage>,
},
}
impl<TSubstream> CustomProtos<TSubstream> {
impl<TMessage, TSubstream> CustomProtos<TMessage, TSubstream> {
/// Creates a `CustomProtos`.
pub fn new(config: &NetworkConfiguration, local_peer_id: &PeerId, registered_protocols: RegisteredProtocols) -> Self {
pub fn new(config: &NetworkConfiguration, local_peer_id: &PeerId, registered_protocols: RegisteredProtocols<TMessage>) -> Self {
// Initialize the topology of the network.
let mut topology = if let Some(ref path) = config.net_config_path {
let path = Path::new(path).join(NODES_FILE);
@@ -265,12 +264,12 @@ impl<TSubstream> CustomProtos<TSubstream> {
///
/// Also note that even we have a valid open substream, it may in fact be already closed
/// without us knowing, in which case the packet will not be received.
pub fn send_packet(&mut self, target: &PeerId, protocol_id: ProtocolId, data: impl Into<Bytes>) {
pub fn send_packet(&mut self, target: &PeerId, protocol_id: ProtocolId, message: TMessage) {
self.events.push(NetworkBehaviourAction::SendEvent {
peer_id: target.clone(),
event: CustomProtosHandlerIn::SendCustomMessage {
protocol: protocol_id,
data: data.into(),
message,
}
});
}
@@ -369,12 +368,13 @@ impl<TSubstream> CustomProtos<TSubstream> {
}
}
impl<TSubstream> NetworkBehaviour for CustomProtos<TSubstream>
impl<TMessage, TSubstream> NetworkBehaviour for CustomProtos<TMessage, TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
TMessage: CustomMessage,
{
type ProtocolsHandler = CustomProtosHandler<TSubstream>;
type OutEvent = CustomProtosOut;
type ProtocolsHandler = CustomProtosHandler<TMessage, TSubstream>;
type OutEvent = CustomProtosOut<TMessage>;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
CustomProtosHandler::new(self.registered_protocols.clone())
@@ -550,14 +550,14 @@ where
self.events.push(NetworkBehaviourAction::GenerateEvent(event));
}
}
CustomProtosHandlerOut::CustomMessage { protocol_id, data } => {
CustomProtosHandlerOut::CustomMessage { protocol_id, message } => {
debug_assert!(self.open_protocols.iter().any(|(s, p)|
s == &source && p == &protocol_id
));
let event = CustomProtosOut::CustomMessage {
peer_id: source,
protocol_id,
data,
message,
};
self.events.push(NetworkBehaviourAction::GenerateEvent(event));
@@ -15,8 +15,8 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use crate::ProtocolId;
use crate::custom_proto::upgrade::{RegisteredProtocol, RegisteredProtocols, RegisteredProtocolSubstream, RegisteredProtocolEvent};
use bytes::Bytes;
use crate::custom_proto::upgrade::{CustomMessage, RegisteredProtocol, RegisteredProtocols};
use crate::custom_proto::upgrade::{RegisteredProtocolSubstream, RegisteredProtocolEvent};
use futures::prelude::*;
use libp2p::core::{
ProtocolsHandler, ProtocolsHandlerEvent,
@@ -36,29 +36,29 @@ use void::Void;
/// `Enable` message.
/// The handler can then be enabled and disabled at any time with the `Enable` and `Disable`
/// messages.
pub struct CustomProtosHandler<TSubstream> {
pub struct CustomProtosHandler<TMessage, TSubstream> {
/// List of all the protocols we support.
protocols: RegisteredProtocols,
protocols: RegisteredProtocols<TMessage>,
/// See the documentation of `State`.
state: State<TSubstream>,
state: State<TMessage, TSubstream>,
/// Value to be returned by `connection_keep_alive()`.
keep_alive: KeepAlive,
/// The active substreams. There should always ever be only one substream per protocol.
substreams: SmallVec<[RegisteredProtocolSubstream<TSubstream>; 6]>,
substreams: SmallVec<[RegisteredProtocolSubstream<TMessage, TSubstream>; 6]>,
/// Queue of events to send to the outside.
events_queue: SmallVec<[ProtocolsHandlerEvent<RegisteredProtocol, ProtocolId, CustomProtosHandlerOut>; 16]>,
events_queue: SmallVec<[ProtocolsHandlerEvent<RegisteredProtocol<TMessage>, ProtocolId, CustomProtosHandlerOut<TMessage>>; 16]>,
}
/// State of the handler.
enum State<TSubstream> {
enum State<TMessage, TSubstream> {
/// Waiting for the behaviour to tell the handler whether it is enabled or disabled.
/// Contains a list of substreams opened by the remote and that we will integrate to
/// `substreams` only if we get enabled.
Init(SmallVec<[RegisteredProtocolSubstream<TSubstream>; 6]>),
Init(SmallVec<[RegisteredProtocolSubstream<TMessage, TSubstream>; 6]>),
/// Normal functionning.
Normal,
@@ -74,7 +74,7 @@ enum State<TSubstream> {
/// Event that can be received by a `CustomProtosHandler`.
#[derive(Debug)]
pub enum CustomProtosHandlerIn {
pub enum CustomProtosHandlerIn<TMessage> {
/// The node should start using custom protocols and actively open substreams.
EnableActive,
@@ -88,14 +88,14 @@ pub enum CustomProtosHandlerIn {
SendCustomMessage {
/// The protocol to use.
protocol: ProtocolId,
/// The data to send.
data: Bytes,
/// The message to send.
message: TMessage,
},
}
/// Event that can be emitted by a `CustomProtosHandler`.
#[derive(Debug)]
pub enum CustomProtosHandlerOut {
pub enum CustomProtosHandlerOut<TMessage> {
/// Opened a custom protocol with the remote.
CustomProtocolOpen {
/// Identifier of the protocol.
@@ -116,8 +116,8 @@ pub enum CustomProtosHandlerOut {
CustomMessage {
/// Protocol which generated the message.
protocol_id: ProtocolId,
/// Data that has been received.
data: Bytes,
/// Message that has been received.
message: TMessage,
},
/// A substream to the remote is clogged. The send buffer is very large, and we should print
@@ -126,7 +126,7 @@ pub enum CustomProtosHandlerOut {
/// Protocol which is clogged.
protocol_id: ProtocolId,
/// Copy of the messages that are within the buffer, for further diagnostic.
messages: Vec<Bytes>,
messages: Vec<TMessage>,
},
/// An error has happened on the protocol level with this node.
@@ -138,12 +138,12 @@ pub enum CustomProtosHandlerOut {
},
}
impl<TSubstream> CustomProtosHandler<TSubstream>
impl<TMessage, TSubstream> CustomProtosHandler<TMessage, TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
/// Builds a new `CustomProtosHandler`.
pub fn new(protocols: RegisteredProtocols) -> Self {
pub fn new(protocols: RegisteredProtocols<TMessage>) -> Self {
CustomProtosHandler {
protocols,
// We keep the connection alive for at least 5 seconds, waiting for what happens.
@@ -157,7 +157,7 @@ where
/// Called by `inject_fully_negotiated_inbound` and `inject_fully_negotiated_outbound`.
fn inject_fully_negotiated(
&mut self,
proto: RegisteredProtocolSubstream<TSubstream>,
proto: RegisteredProtocolSubstream<TMessage, TSubstream>,
) {
if self.substreams.iter().any(|p| p.protocol_id() == proto.protocol_id()) {
// Skipping protocol that's already open.
@@ -189,16 +189,14 @@ where
}
}
impl<TSubstream> ProtocolsHandler for CustomProtosHandler<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
type InEvent = CustomProtosHandlerIn;
type OutEvent = CustomProtosHandlerOut;
impl<TMessage, TSubstream> ProtocolsHandler for CustomProtosHandler<TMessage, TSubstream>
where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage {
type InEvent = CustomProtosHandlerIn<TMessage>;
type OutEvent = CustomProtosHandlerOut<TMessage>;
type Substream = TSubstream;
type Error = Void;
type InboundProtocol = RegisteredProtocols;
type OutboundProtocol = RegisteredProtocol;
type InboundProtocol = RegisteredProtocols<TMessage>;
type OutboundProtocol = RegisteredProtocol<TMessage>;
type OutboundOpenInfo = ProtocolId;
#[inline]
@@ -222,7 +220,7 @@ where
self.inject_fully_negotiated(proto);
}
fn inject_event(&mut self, message: CustomProtosHandlerIn) {
fn inject_event(&mut self, message: CustomProtosHandlerIn<TMessage>) {
match message {
CustomProtosHandlerIn::Disable => {
match self.state {
@@ -271,7 +269,7 @@ where
}
}
},
CustomProtosHandlerIn::SendCustomMessage { protocol, data } => {
CustomProtosHandlerIn::SendCustomMessage { protocol, message } => {
debug_assert!(self.protocols.has_protocol(protocol),
"invalid protocol id requested in the API of the libp2p networking");
let proto = match self.substreams.iter_mut().find(|p| p.protocol_id() == protocol) {
@@ -285,7 +283,7 @@ where
},
};
proto.send_message(data);
proto.send_message(message);
},
}
}
@@ -342,10 +340,10 @@ where
for n in (0..self.substreams.len()).rev() {
let mut substream = self.substreams.swap_remove(n);
match substream.poll() {
Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(data)))) => {
Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(message)))) => {
let event = CustomProtosHandlerOut::CustomMessage {
protocol_id: substream.protocol_id(),
data
message
};
self.substreams.push(substream);
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(event)))
@@ -389,7 +387,7 @@ where
}
}
impl<TSubstream> fmt::Debug for CustomProtosHandler<TSubstream>
impl<TMessage, TSubstream> fmt::Debug for CustomProtosHandler<TMessage, TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
@@ -15,7 +15,7 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
pub use self::behaviour::{CustomProtos, CustomProtosOut};
pub use self::upgrade::{RegisteredProtocol, RegisteredProtocols};
pub use self::upgrade::{CustomMessage, RegisteredProtocol, RegisteredProtocols};
mod behaviour;
mod handler;
@@ -18,7 +18,8 @@ use crate::ProtocolId;
use bytes::Bytes;
use libp2p::core::{UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade::ProtocolName};
use libp2p::tokio_codec::Framed;
use std::{collections::VecDeque, io, vec::IntoIter as VecIntoIter};
use log::warn;
use std::{collections::VecDeque, io, marker::PhantomData, vec::IntoIter as VecIntoIter};
use futures::{prelude::*, future, stream};
use tokio_io::{AsyncRead, AsyncWrite};
use unsigned_varint::codec::UviBytes;
@@ -27,8 +28,7 @@ use unsigned_varint::codec::UviBytes;
///
/// Note that "a single protocol" here refers to `par` for example. However
/// each protocol can have multiple different versions for networking purposes.
#[derive(Clone)]
pub struct RegisteredProtocol {
pub struct RegisteredProtocol<TMessage> {
/// Id of the protocol for API purposes.
id: ProtocolId,
/// Base name of the protocol as advertised on the network.
@@ -37,9 +37,11 @@ pub struct RegisteredProtocol {
/// List of protocol versions that we support.
/// Ordered in descending order so that the best comes first.
supported_versions: Vec<u8>,
/// Marker to pin the generic.
marker: PhantomData<TMessage>,
}
impl RegisteredProtocol {
impl<TMessage> RegisteredProtocol<TMessage> {
/// Creates a new `RegisteredProtocol`. The `custom_data` parameter will be
/// passed inside the `RegisteredProtocolOutput`.
pub fn new(protocol: ProtocolId, versions: &[u8])
@@ -56,6 +58,7 @@ impl RegisteredProtocol {
tmp.sort_unstable_by(|a, b| b.cmp(&a));
tmp
},
marker: PhantomData,
}
}
@@ -66,16 +69,27 @@ impl RegisteredProtocol {
}
}
impl<TMessage> Clone for RegisteredProtocol<TMessage> {
fn clone(&self) -> Self {
RegisteredProtocol {
id: self.id,
base_name: self.base_name.clone(),
supported_versions: self.supported_versions.clone(),
marker: PhantomData,
}
}
}
/// Output of a `RegisteredProtocol` upgrade.
pub struct RegisteredProtocolSubstream<TSubstream> {
pub struct RegisteredProtocolSubstream<TMessage, TSubstream> {
/// If true, we are in the process of closing the sink.
is_closing: bool,
/// Buffer of packets to send.
send_queue: VecDeque<Bytes>,
send_queue: VecDeque<Vec<u8>>,
/// If true, we should call `poll_complete` on the inner sink.
requires_poll_complete: bool,
/// The underlying substream.
inner: stream::Fuse<Framed<TSubstream, UviBytes<Bytes>>>,
inner: stream::Fuse<Framed<TSubstream, UviBytes<Vec<u8>>>>,
/// Id of the protocol.
protocol_id: ProtocolId,
/// Version of the protocol that was negotiated.
@@ -83,9 +97,11 @@ pub struct RegisteredProtocolSubstream<TSubstream> {
/// If true, we have sent a "remote is clogged" event recently and shouldn't send another one
/// unless the buffer empties then fills itself again.
clogged_fuse: bool,
/// Marker to pin the generic.
marker: PhantomData<TMessage>,
}
impl<TSubstream> RegisteredProtocolSubstream<TSubstream> {
impl<TMessage, TSubstream> RegisteredProtocolSubstream<TMessage, TSubstream> {
/// Returns the protocol id.
#[inline]
pub fn protocol_id(&self) -> ProtocolId {
@@ -110,33 +126,53 @@ impl<TSubstream> RegisteredProtocolSubstream<TSubstream> {
}
/// Sends a message to the substream.
pub fn send_message(&mut self, data: Bytes) {
pub fn send_message(&mut self, data: TMessage)
where TMessage: CustomMessage {
if self.is_closing {
return
}
self.send_queue.push_back(data);
self.send_queue.push_back(data.into_bytes());
}
}
/// Implemented on messages that can be sent or received on the network.
pub trait CustomMessage {
/// Turns a message into raw bytes.
fn into_bytes(self) -> Vec<u8>;
/// Tries to part `bytes` into a message.
fn from_bytes(bytes: &[u8]) -> Result<Self, ()>
where Self: Sized;
}
/// This trait implementation exists mostly for testing convenience.
impl CustomMessage for Vec<u8> {
fn into_bytes(self) -> Vec<u8> {
self
}
fn from_bytes(bytes: &[u8]) -> Result<Self, ()> {
Ok(bytes.to_vec())
}
}
/// Event produced by the `RegisteredProtocolSubstream`.
#[derive(Debug, Clone)]
pub enum RegisteredProtocolEvent {
pub enum RegisteredProtocolEvent<TMessage> {
/// Received a message from the remote.
Message(Bytes),
Message(TMessage),
/// Diagnostic event indicating that the connection is clogged and we should avoid sending too
/// many messages to it.
Clogged {
/// Copy of the messages that are within the buffer, for further diagnostic.
messages: Vec<Bytes>,
messages: Vec<TMessage>,
},
}
impl<TSubstream> Stream for RegisteredProtocolSubstream<TSubstream>
where TSubstream: AsyncRead + AsyncWrite,
{
type Item = RegisteredProtocolEvent;
impl<TMessage, TSubstream> Stream for RegisteredProtocolSubstream<TMessage, TSubstream>
where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage {
type Item = RegisteredProtocolEvent<TMessage>;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
@@ -164,7 +200,10 @@ where TSubstream: AsyncRead + AsyncWrite,
// thus never read any message from the network.
self.clogged_fuse = true;
return Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged {
messages: self.send_queue.iter().cloned().collect(),
messages: self.send_queue.iter()
.map(|m| CustomMessage::from_bytes(&m))
.filter_map(Result::ok)
.collect(),
})))
}
} else {
@@ -181,8 +220,14 @@ where TSubstream: AsyncRead + AsyncWrite,
// Receiving incoming packets.
// Note that `inner` is wrapped in a `Fuse`, therefore we can poll it forever.
match self.inner.poll()? {
Async::Ready(Some(data)) =>
Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(data.freeze())))),
Async::Ready(Some(data)) => {
let message = <TMessage as CustomMessage>::from_bytes(&data)
.map_err(|()| {
warn!(target: "sub-libp2p", "Couldn't decode packet sent by the remote: {:?}", data);
io::ErrorKind::InvalidData
})?;
Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(message))))
},
Async::Ready(None) =>
if !self.requires_poll_complete && self.send_queue.is_empty() {
Ok(Async::Ready(None))
@@ -194,7 +239,7 @@ where TSubstream: AsyncRead + AsyncWrite,
}
}
impl UpgradeInfo for RegisteredProtocol {
impl<TMessage> UpgradeInfo for RegisteredProtocol<TMessage> {
type Info = RegisteredProtocolName;
type InfoIter = VecIntoIter<Self::Info>;
@@ -228,10 +273,10 @@ impl ProtocolName for RegisteredProtocolName {
}
}
impl<TSubstream> InboundUpgrade<TSubstream> for RegisteredProtocol
impl<TMessage, TSubstream> InboundUpgrade<TSubstream> for RegisteredProtocol<TMessage>
where TSubstream: AsyncRead + AsyncWrite,
{
type Output = RegisteredProtocolSubstream<TSubstream>;
type Output = RegisteredProtocolSubstream<TMessage, TSubstream>;
type Future = future::FutureResult<Self::Output, io::Error>;
type Error = io::Error;
@@ -250,11 +295,12 @@ where TSubstream: AsyncRead + AsyncWrite,
protocol_id: self.id,
protocol_version: info.version,
clogged_fuse: false,
marker: PhantomData,
})
}
}
impl<TSubstream> OutboundUpgrade<TSubstream> for RegisteredProtocol
impl<TMessage, TSubstream> OutboundUpgrade<TSubstream> for RegisteredProtocol<TMessage>
where TSubstream: AsyncRead + AsyncWrite,
{
type Output = <Self as InboundUpgrade<TSubstream>>::Output;
@@ -272,10 +318,9 @@ where TSubstream: AsyncRead + AsyncWrite,
}
// Connection upgrade for all the protocols contained in it.
#[derive(Clone)]
pub struct RegisteredProtocols(pub Vec<RegisteredProtocol>);
pub struct RegisteredProtocols<TMessage>(pub Vec<RegisteredProtocol<TMessage>>);
impl RegisteredProtocols {
impl<TMessage> RegisteredProtocols<TMessage> {
/// Returns the number of protocols.
#[inline]
pub fn len(&self) -> usize {
@@ -288,13 +333,13 @@ impl RegisteredProtocols {
}
}
impl Default for RegisteredProtocols {
impl<TMessage> Default for RegisteredProtocols<TMessage> {
fn default() -> Self {
RegisteredProtocols(Vec::new())
}
}
impl UpgradeInfo for RegisteredProtocols {
impl<TMessage> UpgradeInfo for RegisteredProtocols<TMessage> {
type Info = RegisteredProtocolsName;
type InfoIter = VecIntoIter<Self::Info>;
@@ -314,6 +359,12 @@ impl UpgradeInfo for RegisteredProtocols {
}
}
impl<TMessage> Clone for RegisteredProtocols<TMessage> {
fn clone(&self) -> Self {
RegisteredProtocols(self.0.clone())
}
}
/// Implementation of `ProtocolName` for several custom protocols.
#[derive(Debug, Clone)]
pub struct RegisteredProtocolsName {
@@ -329,11 +380,11 @@ impl ProtocolName for RegisteredProtocolsName {
}
}
impl<TSubstream> InboundUpgrade<TSubstream> for RegisteredProtocols
impl<TMessage, TSubstream> InboundUpgrade<TSubstream> for RegisteredProtocols<TMessage>
where TSubstream: AsyncRead + AsyncWrite,
{
type Output = <RegisteredProtocol as InboundUpgrade<TSubstream>>::Output;
type Future = <RegisteredProtocol as InboundUpgrade<TSubstream>>::Future;
type Output = <RegisteredProtocol<TMessage> as InboundUpgrade<TSubstream>>::Output;
type Future = <RegisteredProtocol<TMessage> as InboundUpgrade<TSubstream>>::Future;
type Error = io::Error;
#[inline]
@@ -349,7 +400,7 @@ where TSubstream: AsyncRead + AsyncWrite,
}
}
impl<TSubstream> OutboundUpgrade<TSubstream> for RegisteredProtocols
impl<TMessage, TSubstream> OutboundUpgrade<TSubstream> for RegisteredProtocols<TMessage>
where TSubstream: AsyncRead + AsyncWrite,
{
type Output = <Self as InboundUpgrade<TSubstream>>::Output;
+1 -1
View File
@@ -24,7 +24,7 @@ mod service_task;
mod traits;
mod transport;
pub use crate::custom_proto::RegisteredProtocol;
pub use crate::custom_proto::{CustomMessage, RegisteredProtocol};
pub use crate::error::{Error, ErrorKind, DisconnectReason};
pub use crate::secret::obtain_private_key;
pub use crate::service_task::{start_service, Service, ServiceEvent};
@@ -18,9 +18,8 @@ use crate::{
behaviour::Behaviour, behaviour::BehaviourOut, secret::obtain_private_key_from_config,
transport
};
use crate::custom_proto::{RegisteredProtocol, RegisteredProtocols};
use crate::custom_proto::{CustomMessage, RegisteredProtocol, RegisteredProtocols};
use crate::{Error, NetworkConfiguration, NodeIndex, ProtocolId, parse_str_addr};
use bytes::Bytes;
use fnv::FnvHashMap;
use futures::{prelude::*, Stream};
use libp2p::{multiaddr::Protocol, Multiaddr, PeerId, multiaddr};
@@ -39,11 +38,12 @@ use tokio_timer::Interval;
/// Starts the substrate libp2p service.
///
/// Returns a stream that must be polled regularly in order for the networking to function.
pub fn start_service<TProtos>(
pub fn start_service<TProtos, TMessage>(
config: NetworkConfiguration,
registered_custom: TProtos,
) -> Result<Service, Error>
where TProtos: IntoIterator<Item = RegisteredProtocol> {
) -> Result<Service<TMessage>, Error>
where TProtos: IntoIterator<Item = RegisteredProtocol<TMessage>>,
TMessage: CustomMessage + Send + 'static {
if let Some(ref path) = config.net_config_path {
fs::create_dir_all(Path::new(path))?;
@@ -131,7 +131,7 @@ where TProtos: IntoIterator<Item = RegisteredProtocol> {
/// Event produced by the service.
#[derive(Debug)]
pub enum ServiceEvent {
pub enum ServiceEvent<TMessage> {
/// A custom protocol substream has been opened with a node.
OpenedCustomProtocol {
/// Index of the node.
@@ -172,8 +172,8 @@ pub enum ServiceEvent {
node_index: NodeIndex,
/// Protocol which generated the message.
protocol_id: ProtocolId,
/// Data that has been received.
data: Bytes,
/// Message that has been received.
message: TMessage,
},
/// The substream with a node is clogged. We should avoid sending data to it if possible.
@@ -183,14 +183,14 @@ pub enum ServiceEvent {
/// Protocol which generated the message.
protocol_id: ProtocolId,
/// Copy of the messages that are within the buffer, for further diagnostic.
messages: Vec<Bytes>,
messages: Vec<TMessage>,
},
}
/// Network service. Must be polled regularly in order for the networking to work.
pub struct Service {
pub struct Service<TMessage> where TMessage: CustomMessage {
/// Stream of events of the swarm.
swarm: Swarm<Boxed<(PeerId, StreamMuxerBox), IoError>, Behaviour<Substream<StreamMuxerBox>>>,
swarm: Swarm<Boxed<(PeerId, StreamMuxerBox), IoError>, Behaviour<TMessage, Substream<StreamMuxerBox>>>,
/// Bandwidth logging system. Can be queried to know the average bandwidth consumed.
bandwidth: Arc<transport::BandwidthSinks>,
@@ -209,7 +209,7 @@ pub struct Service {
cleanup: Interval,
/// Events to produce on the Stream.
injected_events: Vec<ServiceEvent>,
injected_events: Vec<ServiceEvent<TMessage>>,
}
/// Information about a node we're connected to.
@@ -223,7 +223,8 @@ struct NodeInfo {
client_version: Option<String>,
}
impl Service {
impl<TMessage> Service<TMessage>
where TMessage: CustomMessage + Send + 'static {
/// Returns an iterator that produces the list of addresses we're listening on.
#[inline]
pub fn listeners(&self) -> impl Iterator<Item = &Multiaddr> {
@@ -305,10 +306,10 @@ impl Service {
&mut self,
node_index: NodeIndex,
protocol: ProtocolId,
data: Vec<u8>
message: TMessage
) {
if let Some(peer_id) = self.nodes_info.get(&node_index).map(|info| &info.peer_id) {
self.swarm.send_custom_message(peer_id, protocol, data);
self.swarm.send_custom_message(peer_id, protocol, message);
} else {
warn!(target: "sub-libp2p", "Tried to send message to unknown node: {:}", node_index);
}
@@ -375,7 +376,7 @@ impl Service {
}
/// Polls for what happened on the network.
fn poll_swarm(&mut self) -> Poll<Option<ServiceEvent>, IoError> {
fn poll_swarm(&mut self) -> Poll<Option<ServiceEvent<TMessage>>, IoError> {
loop {
match self.swarm.poll() {
Ok(Async::Ready(Some(BehaviourOut::CustomProtocolOpen { protocol_id, peer_id, version, endpoint }))) => {
@@ -397,12 +398,12 @@ impl Service {
debug_info: self.peer_debug_info(node_index),
})))
}
Ok(Async::Ready(Some(BehaviourOut::CustomMessage { protocol_id, peer_id, data }))) => {
Ok(Async::Ready(Some(BehaviourOut::CustomMessage { protocol_id, peer_id, message }))) => {
let node_index = *self.index_by_id.get(&peer_id).expect("index_by_id is always kept in sync with the state of the behaviour");
break Ok(Async::Ready(Some(ServiceEvent::CustomMessage {
node_index,
protocol_id,
data,
message,
})))
}
Ok(Async::Ready(Some(BehaviourOut::Clogged { protocol_id, peer_id, messages }))) => {
@@ -431,7 +432,7 @@ impl Service {
}
/// Polls the stream that fires when we need to cleanup and flush the topology.
fn poll_cleanup(&mut self) -> Poll<Option<ServiceEvent>, IoError> {
fn poll_cleanup(&mut self) -> Poll<Option<ServiceEvent<TMessage>>, IoError> {
loop {
match self.cleanup.poll() {
Ok(Async::NotReady) => return Ok(Async::NotReady),
@@ -457,7 +458,7 @@ impl Service {
}
}
impl Drop for Service {
impl<TMessage> Drop for Service<TMessage> where TMessage: CustomMessage {
fn drop(&mut self) {
if let Err(err) = self.swarm.flush_topology() {
warn!(target: "sub-libp2p", "Failed to flush topology: {:?}", err);
@@ -465,8 +466,8 @@ impl Drop for Service {
}
}
impl Stream for Service {
type Item = ServiceEvent;
impl<TMessage> Stream for Service<TMessage> where TMessage: CustomMessage + Send + 'static {
type Item = ServiceEvent<TMessage>;
type Error = IoError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+11 -9
View File
@@ -16,12 +16,14 @@
use futures::{future, stream, prelude::*, try_ready};
use std::{io, iter};
use substrate_network_libp2p::{ServiceEvent, multiaddr};
use substrate_network_libp2p::{CustomMessage, ServiceEvent, multiaddr};
/// Builds two services. The second one and further have the first one as its bootstrap node.
/// This is to be used only for testing, and a panic will happen if something goes wrong.
fn build_nodes(num: usize) -> Vec<substrate_network_libp2p::Service> {
let mut result: Vec<substrate_network_libp2p::Service> = Vec::with_capacity(num);
fn build_nodes<TMsg>(num: usize) -> Vec<substrate_network_libp2p::Service<TMsg>>
where TMsg: CustomMessage + Send + 'static
{
let mut result: Vec<substrate_network_libp2p::Service<_>> = Vec::with_capacity(num);
for _ in 0 .. num {
let mut boot_nodes = Vec::new();
@@ -47,7 +49,7 @@ fn build_nodes(num: usize) -> Vec<substrate_network_libp2p::Service> {
#[test]
fn basic_two_nodes_connectivity() {
let (mut service1, mut service2) = {
let mut l = build_nodes(2).into_iter();
let mut l = build_nodes::<Vec<u8>>(2).into_iter();
let a = l.next().unwrap();
let b = l.next().unwrap();
(a, b)
@@ -86,7 +88,7 @@ fn two_nodes_transfer_lots_of_packets() {
const NUM_PACKETS: u32 = 20000;
let (mut service1, mut service2) = {
let mut l = build_nodes(2).into_iter();
let mut l = build_nodes::<Vec<u8>>(2).into_iter();
let a = l.next().unwrap();
let b = l.next().unwrap();
(a, b)
@@ -110,9 +112,9 @@ fn two_nodes_transfer_lots_of_packets() {
loop {
match try_ready!(service2.poll()) {
Some(ServiceEvent::OpenedCustomProtocol { .. }) => {},
Some(ServiceEvent::CustomMessage { data, .. }) => {
assert_eq!(data.len(), 1);
assert_eq!(u32::from(data[0]), packet_counter % 256);
Some(ServiceEvent::CustomMessage { message, .. }) => {
assert_eq!(message.len(), 1);
assert_eq!(u32::from(message[0]), packet_counter % 256);
packet_counter += 1;
if packet_counter == NUM_PACKETS {
return Ok(Async::Ready(()))
@@ -135,7 +137,7 @@ fn many_nodes_connectivity() {
// increased in the `NetworkConfiguration`.
const NUM_NODES: usize = 25;
let mut futures = build_nodes(NUM_NODES)
let mut futures = build_nodes::<Vec<u8>>(NUM_NODES)
.into_iter()
.map(move |mut node| {
let mut num_connecs = 0;
+14
View File
@@ -125,6 +125,8 @@ pub struct RemoteReadResponse {
/// Generic types.
pub mod generic {
use parity_codec::{Encode, Decode};
use network_libp2p::CustomMessage;
use runtime_primitives::Justification;
use parity_codec_derive::{Encode, Decode};
use crate::config::Roles;
@@ -197,6 +199,18 @@ pub mod generic {
ChainSpecific(Vec<u8>),
}
impl<Header, Hash, Number, Extrinsic> CustomMessage for Message<Header, Hash, Number, Extrinsic>
where Self: Decode + Encode
{
fn into_bytes(self) -> Vec<u8> {
self.encode()
}
fn from_bytes(bytes: &[u8]) -> Result<Self, ()> {
Decode::decode(&mut &bytes[..]).ok_or(())
}
}
/// Status sent on connection.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
pub struct Status<Hash, Number> {
+5 -6
View File
@@ -16,7 +16,6 @@
//! On-demand requests service.
use parity_codec::Encode;
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::{Instant, Duration};
@@ -79,7 +78,7 @@ pub trait OnDemandService<Block: BlockT>: Send + Sync {
pub struct OnDemand<B: BlockT> {
core: Mutex<OnDemandCore<B>>,
checker: Arc<FetchChecker<B>>,
network_sender: Mutex<Option<NetworkChan>>,
network_sender: Mutex<Option<NetworkChan<B>>>,
}
/// On-demand remote call response.
@@ -150,11 +149,11 @@ impl<B: BlockT> OnDemand<B> where
}
/// Sets weak reference to network service.
pub fn set_network_sender(&self, network_sender: NetworkChan) {
pub fn set_network_sender(&self, network_sender: NetworkChan<B>) {
self.network_sender.lock().replace(network_sender);
}
fn send(&self, msg: NetworkMsg) {
fn send(&self, msg: NetworkMsg<B>) {
let _ = self.network_sender
.lock()
.as_ref()
@@ -459,7 +458,7 @@ impl<B> OnDemandCore<B> where
let mut request = self.pending_requests.pop_front().expect("checked in loop condition; qed");
request.timestamp = Instant::now();
trace!(target: "sync", "Dispatching remote request {} to peer {}", request.id, peer);
on_demand.send(NetworkMsg::Outgoing(peer, request.message().encode()));
on_demand.send(NetworkMsg::Outgoing(peer, request.message()));
self.active_peers.insert(peer, request);
}
@@ -604,7 +603,7 @@ pub mod tests {
}
}
fn assert_disconnected_peer(network_port: NetworkPort, expected_severity: Severity) {
fn assert_disconnected_peer(network_port: NetworkPort<Block>, expected_severity: Severity) {
let mut disconnect_count = 0;
while let Ok(msg) = network_port.receiver().try_recv() {
match msg {
+6 -7
View File
@@ -14,7 +14,6 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use parity_codec::Encode;
use crossbeam_channel::{self as channel, Receiver, Sender, select};
use network_libp2p::{NodeIndex, Severity};
use primitives::storage::StorageKey;
@@ -56,7 +55,7 @@ const LIGHT_MAXIMAL_BLOCKS_DIFFERENCE: u64 = 8192;
// Lock must always be taken in order declared here.
pub struct Protocol<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> {
network_chan: NetworkChan,
network_chan: NetworkChan<B>,
port: Receiver<ProtocolMsg<B, S>>,
config: ProtocolConfig,
on_demand: Option<Arc<OnDemandService<B>>>,
@@ -133,14 +132,14 @@ pub trait Context<B: BlockT> {
/// Protocol context.
pub(crate) struct ProtocolContext<'a, B: 'a + BlockT, H: 'a + ExHashT> {
network_chan: &'a NetworkChan,
network_chan: &'a NetworkChan<B>,
context_data: &'a mut ContextData<B, H>,
}
impl<'a, B: BlockT + 'a, H: 'a + ExHashT> ProtocolContext<'a, B, H> {
pub(crate) fn new(
context_data: &'a mut ContextData<B, H>,
network_chan: &'a NetworkChan,
network_chan: &'a NetworkChan<B>,
) -> Self {
ProtocolContext {
network_chan,
@@ -276,7 +275,7 @@ pub enum ProtocolMsg<B: BlockT, S: NetworkSpecialization<B>,> {
impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
/// Create a new instance.
pub fn new<I: 'static + ImportQueue<B>>(
network_chan: NetworkChan,
network_chan: NetworkChan<B>,
config: ProtocolConfig,
chain: Arc<Client<B>>,
import_queue: Arc<I>,
@@ -1106,7 +1105,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
fn send_message<B: BlockT, H: ExHashT>(
peers: &mut HashMap<NodeIndex, Peer<B, H>>,
network_chan: &NetworkChan,
network_chan: &NetworkChan<B>,
who: NodeIndex,
mut message: Message<B>,
) {
@@ -1124,7 +1123,7 @@ fn send_message<B: BlockT, H: ExHashT>(
}
_ => (),
}
network_chan.send(NetworkMsg::Outgoing(who, message.encode()));
network_chan.send(NetworkMsg::Outgoing(who, message));
}
/// Construct a simple protocol that is composed of several sub protocols.
+31 -47
View File
@@ -25,8 +25,8 @@ use network_libp2p::{start_service, parse_str_addr, Service as NetworkService, S
use network_libp2p::{Protocol as Libp2pProtocol, RegisteredProtocol};
use consensus::import_queue::{ImportQueue, Link};
use crate::consensus_gossip::ConsensusGossip;
use crate::message::Message;
use crate::protocol::{self, Context, Protocol, ProtocolMsg, ProtocolStatus, PeerInfo};
use parity_codec::Decode;
use crate::config::Params;
use crossbeam_channel::{self as channel, Receiver, Sender, TryRecvError};
use crate::error::Error;
@@ -72,7 +72,7 @@ pub struct NetworkLink<B: BlockT, S: NetworkSpecialization<B>> {
/// The protocol sender
pub(crate) protocol_sender: Sender<ProtocolMsg<B, S>>,
/// The network sender
pub(crate) network_sender: NetworkChan,
pub(crate) network_sender: NetworkChan<B>,
}
impl<B: BlockT, S: NetworkSpecialization<B>> Link<B> for NetworkLink<B, S> {
@@ -108,7 +108,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>> Link<B> for NetworkLink<B, S> {
/// Substrate network service. Handles network IO and manages connectivity.
pub struct Service<B: BlockT + 'static, S: NetworkSpecialization<B>> {
/// Network service
network: Arc<Mutex<NetworkService>>,
network: Arc<Mutex<NetworkService<Message<B>>>>,
/// Protocol sender
protocol_sender: Sender<ProtocolMsg<B, S>>,
/// Sender for messages to the background service task, and handle for the background thread.
@@ -123,7 +123,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
params: Params<B, S, H>,
protocol_id: ProtocolId,
import_queue: Arc<I>,
) -> Result<(Arc<Service<B, S>>, NetworkChan), Error> {
) -> Result<(Arc<Service<B, S>>, NetworkChan<B>), Error> {
let (network_chan, network_port) = network_channel(protocol_id);
let protocol_sender = Protocol::new(
network_chan.clone(),
@@ -139,7 +139,6 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
let (thread, network) = start_thread(
protocol_sender.clone(),
network_port,
network_chan.clone(),
params.network_config,
registered,
)?;
@@ -332,7 +331,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> ManageNetwork for Service
/// Create a NetworkPort/Chan pair.
pub fn network_channel(protocol_id: ProtocolId) -> (NetworkChan, NetworkPort) {
pub fn network_channel<B: BlockT + 'static>(protocol_id: ProtocolId) -> (NetworkChan<B>, NetworkPort<B>) {
let (network_sender, network_receiver) = channel::unbounded();
let task_notify = Arc::new(AtomicTask::new());
let network_port = NetworkPort::new(network_receiver, protocol_id, task_notify.clone());
@@ -343,14 +342,14 @@ pub fn network_channel(protocol_id: ProtocolId) -> (NetworkChan, NetworkPort) {
/// A sender of NetworkMsg that notifies a task when a message has been sent.
#[derive(Clone)]
pub struct NetworkChan {
sender: Sender<NetworkMsg>,
pub struct NetworkChan<B: BlockT + 'static> {
sender: Sender<NetworkMsg<B>>,
task_notify: Arc<AtomicTask>,
}
impl NetworkChan {
impl<B: BlockT + 'static> NetworkChan<B> {
/// Create a new network chan.
pub fn new(sender: Sender<NetworkMsg>, task_notify: Arc<AtomicTask>) -> Self {
pub fn new(sender: Sender<NetworkMsg<B>>, task_notify: Arc<AtomicTask>) -> Self {
NetworkChan {
sender,
task_notify,
@@ -358,13 +357,13 @@ impl NetworkChan {
}
/// Send a messaging, to be handled on a stream. Notify the task handling the stream.
pub fn send(&self, msg: NetworkMsg) {
pub fn send(&self, msg: NetworkMsg<B>) {
let _ = self.sender.send(msg);
self.task_notify.notify();
}
}
impl Drop for NetworkChan {
impl<B: BlockT + 'static> Drop for NetworkChan<B> {
/// Notifying the task when a sender is dropped(when all are dropped, the stream is finished).
fn drop(&mut self) {
self.task_notify.notify();
@@ -373,15 +372,15 @@ impl Drop for NetworkChan {
/// A receiver of NetworkMsg that makes the protocol-id available with each message.
pub struct NetworkPort {
receiver: Receiver<NetworkMsg>,
pub struct NetworkPort<B: BlockT + 'static> {
receiver: Receiver<NetworkMsg<B>>,
protocol_id: ProtocolId,
task_notify: Arc<AtomicTask>,
}
impl NetworkPort {
impl<B: BlockT + 'static> NetworkPort<B> {
/// Create a new network port for a given protocol-id.
pub fn new(receiver: Receiver<NetworkMsg>, protocol_id: ProtocolId, task_notify: Arc<AtomicTask>) -> Self {
pub fn new(receiver: Receiver<NetworkMsg<B>>, protocol_id: ProtocolId, task_notify: Arc<AtomicTask>) -> Self {
Self {
receiver,
protocol_id,
@@ -391,7 +390,7 @@ impl NetworkPort {
/// Receive a message, if any is currently-enqueued.
/// Register the current tokio task for notification when a new message is available.
pub fn take_one_message(&self) -> Result<Option<(ProtocolId, NetworkMsg)>, ()> {
pub fn take_one_message(&self) -> Result<Option<(ProtocolId, NetworkMsg<B>)>, ()> {
self.task_notify.register();
match self.receiver.try_recv() {
Ok(msg) => Ok(Some((self.protocol_id.clone(), msg))),
@@ -402,18 +401,18 @@ impl NetworkPort {
/// Get a reference to the underlying crossbeam receiver.
#[cfg(any(test, feature = "test-helpers"))]
pub fn receiver(&self) -> &Receiver<NetworkMsg> {
pub fn receiver(&self) -> &Receiver<NetworkMsg<B>> {
&self.receiver
}
}
/// Messages to be handled by NetworkService.
#[derive(Debug)]
pub enum NetworkMsg {
pub enum NetworkMsg<B: BlockT + 'static> {
/// Ask network to convert a list of nodes, to a list of peers.
PeerIds(Vec<NodeIndex>, Sender<Vec<(NodeIndex, Option<PeerId>)>>),
/// Send an outgoing custom message.
Outgoing(NodeIndex, Vec<u8>),
Outgoing(NodeIndex, Message<B>),
/// Report a peer.
ReportPeer(NodeIndex, Severity),
/// Get a peer id.
@@ -423,11 +422,10 @@ pub enum NetworkMsg {
/// Starts the background thread that handles the networking.
fn start_thread<B: BlockT + 'static, S: NetworkSpecialization<B>>(
protocol_sender: Sender<ProtocolMsg<B, S>>,
network_port: NetworkPort,
network_sender: NetworkChan,
network_port: NetworkPort<B>,
config: NetworkConfiguration,
registered: RegisteredProtocol,
) -> Result<((oneshot::Sender<()>, thread::JoinHandle<()>), Arc<Mutex<NetworkService>>), Error> {
registered: RegisteredProtocol<Message<B>>,
) -> Result<((oneshot::Sender<()>, thread::JoinHandle<()>), Arc<Mutex<NetworkService<Message<B>>>>), Error> {
let protocol_id = registered.id();
// Start the main service.
@@ -447,7 +445,7 @@ fn start_thread<B: BlockT + 'static, S: NetworkSpecialization<B>>(
let service_clone = service.clone();
let mut runtime = Runtime::new()?;
let thread = thread::Builder::new().name("network".to_string()).spawn(move || {
let fut = run_thread(protocol_sender, service_clone, network_sender, network_port, protocol_id)
let fut = run_thread(protocol_sender, service_clone, network_port, protocol_id)
.select(close_rx.then(|_| Ok(())))
.map(|(val, _)| val)
.map_err(|(err,_ )| err);
@@ -466,9 +464,8 @@ fn start_thread<B: BlockT + 'static, S: NetworkSpecialization<B>>(
/// Runs the background thread that handles the networking.
fn run_thread<B: BlockT + 'static, S: NetworkSpecialization<B>>(
protocol_sender: Sender<ProtocolMsg<B, S>>,
network_service: Arc<Mutex<NetworkService>>,
network_sender: NetworkChan,
network_port: NetworkPort,
network_service: Arc<Mutex<NetworkService<Message<B>>>>,
network_port: NetworkPort<B>,
protocol_id: ProtocolId,
) -> impl Future<Item = (), Error = io::Error> {
@@ -538,28 +535,15 @@ fn run_thread<B: BlockT + 'static, S: NetworkSpecialization<B>>(
NetworkServiceEvent::ClosedCustomProtocol { node_index, debug_info, .. } => {
let _ = protocol_sender.send(ProtocolMsg::PeerDisconnected(node_index, debug_info));
}
NetworkServiceEvent::CustomMessage { node_index, data, .. } => {
if let Some(m) = Decode::decode(&mut (&data as &[u8])) {
let _ = protocol_sender.send(ProtocolMsg::CustomMessage(node_index, m));
return Ok(())
}
let _ = network_sender.send(
NetworkMsg::ReportPeer(
node_index,
Severity::Bad("Peer sent us a packet with invalid format".to_string())
)
);
NetworkServiceEvent::CustomMessage { node_index, message, .. } => {
let _ = protocol_sender.send(ProtocolMsg::CustomMessage(node_index, message));
return Ok(())
}
NetworkServiceEvent::Clogged { node_index, messages, .. } => {
debug!(target: "sync", "{} clogging messages:", messages.len());
for msg_bytes in messages.iter().take(5) {
if let Some(msg) = Decode::decode(&mut (&msg_bytes as &[u8])) {
debug!(target: "sync", "{:?}", msg);
let _ = protocol_sender.send(ProtocolMsg::PeerClogged(node_index, Some(msg)));
} else {
debug!(target: "sync", "{:?}", msg_bytes);
let _ = protocol_sender.send(ProtocolMsg::PeerClogged(node_index, None));
}
for msg in messages.into_iter().take(5) {
debug!(target: "sync", "{:?}", msg);
let _ = protocol_sender.send(ProtocolMsg::PeerClogged(node_index, Some(msg)));
}
}
};
+13 -22
View File
@@ -28,7 +28,6 @@ use std::time::Duration;
use log::trace;
use client;
use client::block_builder::BlockBuilder;
use parity_codec::{Decode, Encode};
use crate::config::ProtocolConfig;
use consensus::import_queue::{import_many_blocks, ImportQueue, ImportQueueStatus, IncomingBlock};
use consensus::import_queue::{Link, SharedBlockImport, SharedJustificationImport, Verifier};
@@ -39,7 +38,9 @@ use crossbeam_channel::{self as channel, Sender, select};
use futures::Future;
use futures::sync::{mpsc, oneshot};
use keyring::Keyring;
use network_libp2p::{NodeIndex, ProtocolId, Severity};
use crate::message::Message;
use network_libp2p::{NodeIndex, ProtocolId};
use parity_codec::Encode;
use parking_lot::Mutex;
use primitives::{H256, Ed25519AuthorityId};
use crate::protocol::{Context, Protocol, ProtocolMsg, ProtocolStatus};
@@ -236,9 +237,9 @@ pub type PeersClient = client::Client<test_client::Backend, test_client::Executo
pub struct Peer<V: 'static + Verifier<Block>, D> {
client: Arc<PeersClient>,
pub protocol_sender: Sender<ProtocolMsg<Block, DummySpecialization>>,
network_port: Mutex<NetworkPort>,
network_port: Mutex<NetworkPort<Block>>,
import_queue: Arc<SyncImportQueue<Block, V>>,
network_sender: NetworkChan,
network_sender: NetworkChan<Block>,
pub data: D,
}
@@ -247,8 +248,8 @@ impl<V: 'static + Verifier<Block>, D> Peer<V, D> {
client: Arc<PeersClient>,
import_queue: Arc<SyncImportQueue<Block, V>>,
protocol_sender: Sender<ProtocolMsg<Block, DummySpecialization>>,
network_sender: NetworkChan,
network_port: NetworkPort,
network_sender: NetworkChan<Block>,
network_port: NetworkPort<Block>,
data: D,
) -> Self {
let network_port = Mutex::new(network_port);
@@ -304,24 +305,14 @@ impl<V: 'static + Verifier<Block>, D> Peer<V, D> {
}
/// Receive a message from another peer. Return a set of peers to disconnect.
fn receive_message(&self, from: NodeIndex, msg: Vec<u8>) {
match Decode::decode(&mut (&msg as &[u8])) {
Some(m) => {
let _ = self
.protocol_sender
.send(ProtocolMsg::CustomMessage(from, m));
}
None => {
let _ = self.network_sender.send(NetworkMsg::ReportPeer(
from,
Severity::Bad("Peer sent us a packet with invalid format".to_string()),
));
}
}
fn receive_message(&self, from: NodeIndex, msg: Message<Block>) {
let _ = self
.protocol_sender
.send(ProtocolMsg::CustomMessage(from, msg));
}
/// Produce the next pending message to send to another peer.
fn pending_message(&self) -> Option<NetworkMsg> {
fn pending_message(&self) -> Option<NetworkMsg<Block>> {
select! {
recv(self.network_port.lock().receiver()) -> msg => return msg.ok(),
// If there are no messages ready, give protocol a change to send one.
@@ -330,7 +321,7 @@ impl<V: 'static + Verifier<Block>, D> Peer<V, D> {
}
/// Produce the next pending message to send to another peer, without waiting.
fn pending_message_fast(&self) -> Option<NetworkMsg> {
fn pending_message_fast(&self) -> Option<NetworkMsg<Block>> {
self.network_port.lock().receiver().try_recv().ok()
}