Refactor out disable/disconnect peer to make API more declarative (#394)

* Refactor out disable/disconnect peer to make API more declarative

* Minor fixes.

* rename `disconnect_peer` to `drop_peer` in low-level
This commit is contained in:
Gav Wood
2018-07-21 16:13:19 +02:00
committed by GitHub
parent 625918394a
commit 76384d0afb
19 changed files with 263 additions and 216 deletions
+11 -11
View File
@@ -49,7 +49,7 @@ use parking_lot::Mutex;
use polkadot_consensus::{Statement, SignedStatement, GenericStatement};
use polkadot_primitives::{AccountId, Block, SessionKey, Hash, Header};
use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic, CandidateReceipt, Collation};
use substrate_network::{PeerId, RequestId, Context};
use substrate_network::{PeerId, RequestId, Context, Severity};
use substrate_network::consensus_gossip::ConsensusGossip;
use substrate_network::{message, generic_message};
use substrate_network::specialization::Specialization;
@@ -403,7 +403,7 @@ impl PolkadotProtocol {
};
if !info.claimed_validator {
ctx.disable_peer(peer_id, "Session key broadcasted without setting authority role");
ctx.report_peer(peer_id, Severity::Bad("Session key broadcasted without setting authority role"));
return;
}
@@ -438,7 +438,7 @@ impl PolkadotProtocol {
self.pending.push(req);
self.dispatch_pending_requests(ctx);
}
None => ctx.disable_peer(peer_id, "Unexpected block data response"),
None => ctx.report_peer(peer_id, Severity::Bad("Unexpected block data response")),
}
}
@@ -453,9 +453,9 @@ impl PolkadotProtocol {
};
match info.validator_key {
None => ctx.disable_peer(
None => ctx.report_peer(
peer_id,
"Sent collator role without registering first as validator",
Severity::Bad("Sent collator role without registering first as validator"),
),
Some(key) => for (relay_parent, collation) in self.local_collations.note_validator_role(key, role) {
send_polkadot_message(
@@ -483,7 +483,7 @@ impl Specialization<Block> for PolkadotProtocol {
if let Some((ref acc_id, ref para_id)) = local_status.collating_for {
if self.collator_peer_id(acc_id.clone()).is_some() {
ctx.disconnect_peer(peer_id);
ctx.report_peer(peer_id, Severity::Useless("Unknown Polkadot-specific reason"));
return
}
@@ -571,7 +571,7 @@ impl Specialization<Block> for PolkadotProtocol {
Some(msg) => self.on_polkadot_message(ctx, peer_id, raw, msg),
None => {
trace!(target: "p_net", "Bad message from {}", peer_id);
ctx.disable_peer(peer_id, "Invalid polkadot protocol message format");
ctx.report_peer(peer_id, Severity::Bad("Invalid polkadot protocol message format"));
}
}
}
@@ -616,15 +616,15 @@ impl PolkadotProtocol {
let collated_acc = collation.receipt.collator;
match self.peers.get(&from) {
None => ctx.disconnect_peer(from),
None => ctx.report_peer(from, Severity::Useless("Unknown Polkadot specific reason")),
Some(peer_info) => match peer_info.collating_for {
None => ctx.disable_peer(from, "Sent collation without registering collator intent"),
None => ctx.report_peer(from, Severity::Bad("Sent collation without registering collator intent")),
Some((ref acc_id, ref para_id)) => {
let structurally_valid = para_id == &collation_para && acc_id == &collated_acc;
if structurally_valid && collation.receipt.check_signature().is_ok() {
self.collators.on_collation(acc_id.clone(), relay_parent, collation)
} else {
ctx.disable_peer(from, "Sent malformed collation")
ctx.report_peer(from, Severity::Bad("Sent malformed collation"))
};
}
},
@@ -654,7 +654,7 @@ impl PolkadotProtocol {
// disconnect a collator by account-id.
fn disconnect_bad_collator(&self, ctx: &mut Context<Block>, account_id: AccountId) {
if let Some(peer_id) = self.collator_peer_id(account_id) {
ctx.disable_peer(peer_id, "Consensus layer determined the given collator misbehaved")
ctx.report_peer(peer_id, Severity::Bad("Consensus layer determined the given collator misbehaved"))
}
}
}
+6 -7
View File
@@ -24,7 +24,7 @@ use polkadot_primitives::{Block, Hash, SessionKey};
use polkadot_primitives::parachain::{CandidateReceipt, HeadData, BlockData};
use substrate_primitives::H512;
use codec::Encode;
use substrate_network::{PeerId, PeerInfo, ClientHandle, Context, Roles, message::Message as SubstrateMessage, specialization::Specialization, generic_message::Message as GenericMessage};
use substrate_network::{Severity, PeerId, PeerInfo, ClientHandle, Context, Roles, message::Message as SubstrateMessage, specialization::Specialization, generic_message::Message as GenericMessage};
use std::sync::Arc;
use futures::Future;
@@ -41,12 +41,11 @@ impl Context<Block> for TestContext {
unimplemented!()
}
fn disable_peer(&mut self, peer: PeerId, _reason: &str) {
self.disabled.push(peer);
}
fn disconnect_peer(&mut self, peer: PeerId) {
self.disconnected.push(peer);
fn report_peer(&mut self, peer: PeerId, reason: Severity) {
match reason {
Severity::Bad(_) => self.disabled.push(peer),
_ => self.disconnected.push(peer),
}
}
fn peer_info(&self, _peer: PeerId) -> Option<PeerInfo<Block>> {
@@ -1,18 +1,18 @@
// Copyright 2015-2018 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// This file is part of Substrate.
// Parity is free software: you can redistribute it and/or modify
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Connection filter trait.
@@ -1,18 +1,18 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// This file is part of Substrate.
// Polkadot is free software: you can redistribute it and/or modify
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.?
use bytes::{Bytes, BytesMut};
use ProtocolId;
@@ -122,6 +122,7 @@ where C: AsyncRead + AsyncWrite + 'static, // TODO: 'static :-/
type MultiaddrFuture = Maf;
type Future = future::FutureResult<(Self::Output, Self::MultiaddrFuture), IoError>;
#[allow(deprecated)]
fn upgrade(
self,
socket: C,
@@ -1,18 +1,18 @@
// Copyright 2015-2018 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// This file is part of Substrate.
// Parity is free software: you can redistribute it and/or modify
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use std::{io, net, fmt};
use libc::{ENFILE, EMFILE};
@@ -1,18 +1,18 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// This file is part of Substrate.
// Polkadot is free software: you can redistribute it and/or modify
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.?
#![recursion_limit="128"]
#![type_length_limit = "268435456"]
@@ -1,18 +1,18 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// This file is part of Substrate.
// Polkadot is free software: you can redistribute it and/or modify
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.?
use bytes::Bytes;
use fnv::{FnvHashMap, FnvHashSet};
@@ -126,6 +126,45 @@ struct PeerConnectionInfo {
local_address: Option<Multiaddr>,
}
/// Simplified, POD version of PeerConnectionInfo.
#[derive(Debug, Clone)]
pub struct PeerInfo {
/// Id of the peer.
pub id: PeerstorePeerId,
/// True if this connection was initiated by us.
/// Note that it is theoretically possible that we dial the remote at the
/// same time they dial us, in which case the protocols may be dispatched
/// between both connections, and in which case the value here will be racy.
pub originated: bool,
/// Latest known ping duration.
pub ping: Option<Duration>,
/// The client version of the remote, or `None` if not known.
pub client_version: Option<String>,
/// The multiaddress of the remote, or `None` if not known.
pub remote_address: Option<Multiaddr>,
/// The local multiaddress used to communicate with the remote, or `None`
/// if not known.
pub local_address: Option<Multiaddr>,
}
impl<'a> From<&'a PeerConnectionInfo> for PeerInfo {
fn from(i: &'a PeerConnectionInfo) -> PeerInfo {
PeerInfo {
id: i.id.clone(),
originated: i.originated,
ping: i.ping.lock().clone(),
client_version: i.client_version.clone(),
remote_address: i.remote_address.clone(),
local_address: i.local_address.clone(),
}
}
}
impl NetworkState {
pub fn new(config: &NetworkConfiguration) -> Result<NetworkState, Error> {
// Private and public keys configuration.
@@ -581,24 +620,37 @@ impl NetworkState {
} else {
// We are connected to this peer, but not with the current
// protocol.
debug!(target: "sub-libp2p", "Tried to send message to peer {} \
for which we aren't connected with the requested protocol",
peer_id);
debug!(target: "sub-libp2p",
"Tried to send message to peer {} for which we aren't connected with the requested protocol",
peer_id
);
return Err(ErrorKind::PeerNotFound.into())
}
} else {
debug!(target: "sub-libp2p", "Tried to send message to invalid \
peer ID {}", peer_id);
debug!(target: "sub-libp2p", "Tried to send message to invalid peer ID {}", peer_id);
return Err(ErrorKind::PeerNotFound.into())
}
}
/// Get the info on a peer, if there's an active connection.
pub fn peer_info(&self, who: PeerId) -> Option<PeerInfo> {
self.connections.read().info_by_peer.get(&who).map(Into::into)
}
/// Disconnects a peer, if a connection exists (ie. drops the Kademlia
/// controller, and the senders that were stored in the `UniqueConnec` of
/// `custom_proto`).
pub fn disconnect_peer(&self, peer_id: PeerId) {
pub fn drop_peer(&self, peer_id: PeerId, reason: Option<&str>) {
let mut connections = self.connections.write();
if let Some(peer_info) = connections.info_by_peer.remove(&peer_id) {
if let Some(reason) = reason {
if let (&Some(ref client_version), &Some(ref remote_address)) = (&peer_info.client_version, &peer_info.remote_address) {
debug!(target: "sub-libp2p", "Disconnected peer {} (version: {}, address: {}). {}", peer_id, client_version, remote_address, reason);
} else {
debug!(target: "sub-libp2p", "Disconnected peer {}. {}", peer_id, reason);
}
}
trace!(target: "sub-libp2p", "Destroying peer #{} {:?} ; \
kademlia = {:?} ; num_protos = {:?}", peer_id, peer_info.id,
peer_info.kad_connec.is_alive(),
@@ -1,22 +1,22 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// This file is part of Substrate.
// Polkadot is free software: you can redistribute it and/or modify
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.?
use bytes::Bytes;
use {Error, ErrorKind, NetworkConfiguration, NetworkProtocolHandler};
use {NonReservedPeerMode, NetworkContext, PeerId, ProtocolId};
use {NonReservedPeerMode, NetworkContext, Severity, PeerId, ProtocolId};
use parking_lot::{Mutex, RwLock};
use libp2p;
use libp2p::multiaddr::{AddrComponent, Multiaddr};
@@ -299,8 +299,7 @@ struct NetworkContextImpl {
}
impl NetworkContext for NetworkContextImpl {
fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec<u8>)
-> Result<(), Error> {
fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec<u8>) {
self.send_protocol(self.protocol, peer, packet_id, data)
}
@@ -310,7 +309,7 @@ impl NetworkContext for NetworkContextImpl {
peer: PeerId,
packet_id: PacketId,
data: Vec<u8>
) -> Result<(), Error> {
) {
debug_assert!(self.inner.protocols.read().has_protocol(protocol),
"invalid protocol id requested in the API of the libp2p networking");
// TODO: could be "optimized" by building `message` only after checking the validity of
@@ -318,10 +317,12 @@ impl NetworkContext for NetworkContextImpl {
let mut message = Bytes::with_capacity(1 + data.len());
message.extend_from_slice(&[packet_id]);
message.extend_from_slice(&data);
self.inner.network_state.send(protocol, peer, message)
if self.inner.network_state.send(protocol, peer, message).is_err() {
self.inner.network_state.drop_peer(peer, Some("Sending to peer failed"));
}
}
fn respond(&self, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error> {
fn respond(&self, packet_id: PacketId, data: Vec<u8>) {
if let Some(peer) = self.current_peer {
self.send_protocol(self.protocol, peer, packet_id, data)
} else {
@@ -329,14 +330,19 @@ impl NetworkContext for NetworkContextImpl {
}
}
fn disable_peer(&self, peer: PeerId, reason: &str) {
debug!(target: "sub-libp2p", "Request to disable peer {} for reason {}", peer, reason);
self.inner.network_state.disable_peer(peer, reason);
}
fn disconnect_peer(&self, peer: PeerId) {
debug!(target: "sub-libp2p", "Request to disconnect peer {}", peer);
self.inner.network_state.disconnect_peer(peer);
fn report_peer(&self, peer: PeerId, reason: Severity) {
if let Some(info) = self.inner.network_state.peer_info(peer) {
if let (Some(client_version), Some(remote_address)) = (info.client_version, info.remote_address) {
info!(target: "sub-libp2p", "Peer {} ({} {}) reported by client: {}", peer, remote_address, client_version, reason);
} else {
info!(target: "sub-libp2p", "Peer {} reported by client: {}", peer, reason);
}
}
match reason {
Severity::Bad(reason) => self.inner.network_state.disable_peer(peer, reason),
Severity::Useless(reason) => self.inner.network_state.drop_peer(peer, Some(reason)),
Severity::Timeout => self.inner.network_state.drop_peer(peer, Some("Timeout waiting for response")),
}
}
fn is_expired(&self) -> bool {
@@ -768,9 +774,12 @@ fn handle_custom_connection(
impl Drop for ProtoDisconnectGuard {
fn drop(&mut self) {
debug!(target: "sub-libp2p", "Node {:?} with peer ID {} \
through protocol {:?} disconnected", self.node_id, self.peer_id,
self.protocol);
debug!(target: "sub-libp2p",
"Node {:?} with peer ID {} through protocol {:?} disconnected",
self.node_id,
self.peer_id,
self.protocol
);
self.handler.disconnected(&NetworkContextImpl {
inner: self.inner.clone(),
protocol: self.protocol,
@@ -779,7 +788,7 @@ fn handle_custom_connection(
// When any custom protocol drops, we drop the peer entirely.
// TODO: is this correct?
self.inner.network_state.disconnect_peer(self.peer_id);
self.inner.network_state.drop_peer(self.peer_id, Some("Remote end disconnected"));
}
}
@@ -1271,8 +1280,7 @@ fn ping_all<T, St, C>(
let fut = pinger
.get_or_dial(&swarm_controller, &addr, transport.clone())
.and_then(move |mut p| {
trace!(target: "sub-libp2p",
"Pinging peer #{} aka. {:?}", peer, peer_id);
trace!(target: "sub-libp2p", "Pinging peer #{} aka. {:?}", peer, peer_id);
p.ping()
.map(|()| peer_id)
.map_err(|err| IoError::new(IoErrorKind::Other, err))
@@ -1282,16 +1290,14 @@ fn ping_all<T, St, C>(
.then(move |val|
match val {
Err(err) => {
trace!(target: "sub-libp2p",
"Error while pinging #{:?} => {:?}", peer, err);
shared.network_state.disconnect_peer(peer);
trace!(target: "sub-libp2p", "Error while pinging #{:?} => {:?}", peer, err);
shared.network_state.drop_peer(peer, None); // None so that we don't print messages on such low-level issues.
// Return Ok, otherwise we would close the ping service
Ok(())
},
Ok(peer_id) => {
let elapsed = ping_start_time.elapsed();
trace!(target: "sub-libp2p", "Pong from #{:?} in {:?}",
peer, elapsed);
trace!(target: "sub-libp2p", "Pong from #{:?} in {:?}", peer, elapsed);
shared.network_state.report_ping_duration(peer, elapsed);
shared.kad_system.update_kbuckets(peer_id);
Ok(())
@@ -1,18 +1,18 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// This file is part of Substrate.
// Polkadot is free software: you can redistribute it and/or modify
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.?
use futures::{Async, future, Future, Poll, stream, Stream, sync::mpsc};
use std::io::Error as IoError;
@@ -1,19 +1,20 @@
// Copyright 2015-2018 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// This file is part of Substrate.
// Parity is free software: you can redistribute it and/or modify
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use std::fmt;
use std::cmp::Ordering;
use std::collections::HashMap;
use std::net::{SocketAddr, SocketAddrV4, Ipv4Addr};
@@ -213,22 +214,42 @@ impl NetworkConfiguration {
}
}
/// The severity of misbehaviour of a peer that is reported.
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum Severity<'a> {
/// Peer is timing out. Could be bad connectivity of overload of work on either of our sides.
Timeout,
/// Peer has been notably useless. E.g. unable to answer a request that we might reasonably consider
/// it could answer.
Useless(&'a str),
/// Peer has behaved in an invalid manner. This doesn't necessarily need to be Byzantine, but peer
/// must have taken concrete action in order to behave in such a way which is wantanly invalid.
Bad(&'a str),
}
impl<'a> fmt::Display for Severity<'a> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match *self {
Severity::Timeout => write!(fmt, "Timeout"),
Severity::Useless(r) => write!(fmt, "Useless ({})", r),
Severity::Bad(r) => write!(fmt, "Bad ({})", r),
}
}
}
/// IO access point. This is passed to all IO handlers and provides an interface to the IO subsystem.
pub trait NetworkContext {
/// Send a packet over the network to another peer.
fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error>;
fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec<u8>);
/// Send a packet over the network to another peer using specified protocol.
fn send_protocol(&self, protocol: ProtocolId, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error>;
fn send_protocol(&self, protocol: ProtocolId, peer: PeerId, packet_id: PacketId, data: Vec<u8>);
/// Respond to a current network message. Panics if no there is no packet in the context. If the session is expired returns nothing.
fn respond(&self, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error>;
fn respond(&self, packet_id: PacketId, data: Vec<u8>);
/// Disconnect a peer and prevent it from connecting again.
fn disable_peer(&self, peer: PeerId, reason: &str);
/// Disconnect peer. Reconnect can be attempted later.
fn disconnect_peer(&self, peer: PeerId);
/// Report peer. Depending on the report, peer may be disconnected and possibly banned.
fn report_peer(&self, peer: PeerId, reason: Severity);
/// Check if the session is still active.
fn is_expired(&self) -> bool;
@@ -250,24 +271,20 @@ pub trait NetworkContext {
}
impl<'a, T> NetworkContext for &'a T where T: ?Sized + NetworkContext {
fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error> {
fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec<u8>) {
(**self).send(peer, packet_id, data)
}
fn send_protocol(&self, protocol: ProtocolId, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error> {
fn send_protocol(&self, protocol: ProtocolId, peer: PeerId, packet_id: PacketId, data: Vec<u8>) {
(**self).send_protocol(protocol, peer, packet_id, data)
}
fn respond(&self, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error> {
fn respond(&self, packet_id: PacketId, data: Vec<u8>) {
(**self).respond(packet_id, data)
}
fn disable_peer(&self, peer: PeerId, reason: &str) {
(**self).disable_peer(peer, reason)
}
fn disconnect_peer(&self, peer: PeerId) {
(**self).disconnect_peer(peer)
fn report_peer(&self, peer: PeerId, reason: Severity) {
(**self).report_peer(peer, reason)
}
fn is_expired(&self) -> bool {
@@ -333,42 +350,42 @@ impl NonReservedPeerMode {
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct IpFilter {
pub predefined: AllowIP,
pub custom_allow: Vec<IpNetwork>,
pub custom_block: Vec<IpNetwork>,
pub predefined: AllowIP,
pub custom_allow: Vec<IpNetwork>,
pub custom_block: Vec<IpNetwork>,
}
impl Default for IpFilter {
fn default() -> Self {
IpFilter {
predefined: AllowIP::All,
custom_allow: vec![],
custom_block: vec![],
}
}
fn default() -> Self {
IpFilter {
predefined: AllowIP::All,
custom_allow: vec![],
custom_block: vec![],
}
}
}
impl IpFilter {
/// Attempt to parse the peer mode from a string.
pub fn parse(s: &str) -> Result<IpFilter, IpNetworkError> {
let mut filter = IpFilter::default();
for f in s.split_whitespace() {
match f {
"all" => filter.predefined = AllowIP::All,
"private" => filter.predefined = AllowIP::Private,
"public" => filter.predefined = AllowIP::Public,
"none" => filter.predefined = AllowIP::None,
custom => {
if custom.starts_with("-") {
filter.custom_block.push(IpNetwork::from_str(&custom.to_owned().split_off(1))?)
} else {
filter.custom_allow.push(IpNetwork::from_str(custom)?)
}
}
}
}
Ok(filter)
}
/// Attempt to parse the peer mode from a string.
pub fn parse(s: &str) -> Result<IpFilter, IpNetworkError> {
let mut filter = IpFilter::default();
for f in s.split_whitespace() {
match f {
"all" => filter.predefined = AllowIP::All,
"private" => filter.predefined = AllowIP::Private,
"public" => filter.predefined = AllowIP::Public,
"none" => filter.predefined = AllowIP::None,
custom => {
if custom.starts_with("-") {
filter.custom_block.push(IpNetwork::from_str(&custom.to_owned().split_off(1))?)
} else {
filter.custom_allow.push(IpNetwork::from_str(custom)?)
}
}
}
}
Ok(filter)
}
}
/// IP fiter
@@ -380,6 +397,6 @@ pub enum AllowIP {
Private,
/// Connect to public network only
Public,
/// Block all addresses
None,
/// Block all addresses
None,
}
@@ -1,18 +1,18 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// This file is part of Substrate.
// Polkadot is free software: you can redistribute it and/or modify
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.?
use libp2p::{self, Transport, mplex, secio, yamux};
use libp2p::core::{MuxedTransport, either, upgrade};
@@ -73,9 +73,9 @@ impl NetworkProtocolHandler for TestProtocol {
fn connected(&self, io: &NetworkContext, peer: &PeerId) {
if self.drop_session {
io.disconnect_peer(*peer)
io.report_peer(*peer, Severity::Bad("We are evil and just want to drop"))
} else {
io.respond(33, "hello".to_owned().into_bytes()).unwrap();
io.respond(33, "hello".to_owned().into_bytes());
}
}
+15 -11
View File
@@ -22,7 +22,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use parking_lot::{Condvar, Mutex, RwLock};
use client::{BlockOrigin, BlockStatus, ImportResult};
use network_libp2p::PeerId;
use network_libp2p::{PeerId, Severity};
use runtime_primitives::generic::BlockId;
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor, Zero};
@@ -202,9 +202,9 @@ trait SyncLinkApi<B: BlockT> {
/// Maintain sync.
fn maintain_sync(&mut self);
/// Disconnect from peer.
fn disconnect(&mut self, peer_id: PeerId);
fn useless_peer(&mut self, peer_id: PeerId, reason: &str);
/// Disconnect from peer and restart sync.
fn disconnect_and_restart(&mut self, peer_id: PeerId);
fn note_useless_and_restart_sync(&mut self, peer_id: PeerId, reason: &str);
/// Restart sync.
fn restart(&mut self);
}
@@ -357,11 +357,15 @@ fn process_import_result<'a, B: BlockT>(
1
},
Err(BlockImportError::Disconnect(peer_id)) => {
link.disconnect(peer_id);
// TODO: FIXME: @arkpar BlockImport shouldn't be trying to manage the peer set.
// This should contain an actual reason.
link.useless_peer(peer_id, "Import result was stated Disconnect");
0
},
Err(BlockImportError::DisconnectAndRestart(peer_id)) => {
link.disconnect_and_restart(peer_id);
// TODO: FIXME: @arkpar BlockImport shouldn't be trying to manage the peer set.
// This should contain an actual reason.
link.note_useless_and_restart_sync(peer_id, "Import result was stated DisconnectAndRestart");
0
},
Err(BlockImportError::Restart) => {
@@ -404,13 +408,13 @@ impl<'a, B: 'static + BlockT, E: ExecuteInContext<B>> SyncLinkApi<B> for SyncLin
self.with_sync(|sync, protocol| sync.maintain_sync(protocol))
}
fn disconnect(&mut self, peer_id: PeerId) {
self.with_sync(|_, protocol| protocol.disconnect_peer(peer_id))
fn useless_peer(&mut self, peer_id: PeerId, reason: &str) {
self.with_sync(|_, protocol| protocol.report_peer(peer_id, Severity::Useless(reason)))
}
fn disconnect_and_restart(&mut self, peer_id: PeerId) {
fn note_useless_and_restart_sync(&mut self, peer_id: PeerId, reason: &str) {
self.with_sync(|sync, protocol| {
protocol.disconnect_peer(peer_id);
protocol.report_peer(peer_id, Severity::Useless(reason)); // is this actually malign or just useless?
sync.restart(protocol);
})
}
@@ -486,8 +490,8 @@ pub mod tests {
fn chain(&self) -> &Client<Block> { &*self.chain }
fn block_imported(&mut self, _hash: &Hash, _number: NumberFor<Block>) { self.imported += 1; }
fn maintain_sync(&mut self) { self.maintains += 1; }
fn disconnect(&mut self, _peer_id: PeerId) { self.disconnects += 1; }
fn disconnect_and_restart(&mut self, _peer_id: PeerId) { self.disconnects += 1; self.restarts += 1; }
fn useless_peer(&mut self, _: PeerId, _: &str) { self.disconnects += 1; }
fn note_useless_and_restart_sync(&mut self, _: PeerId, _: &str) { self.disconnects += 1; self.restarts += 1; }
fn restart(&mut self) { self.restarts += 1; }
}
+7 -13
View File
@@ -14,17 +14,15 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?
use network_libp2p::{NetworkContext, PeerId, Error as NetworkError, SessionInfo};
use network_libp2p::{NetworkContext, Severity, PeerId, SessionInfo};
/// IO interface for the syncing handler.
/// Provides peer connection management and an interface to the blockchain client.
pub trait SyncIo {
/// Disable a peer
fn disable_peer(&mut self, peer_id: PeerId, reason: &str);
/// Disconnect peer
fn disconnect_peer(&mut self, peer_id: PeerId);
/// Report a peer for misbehaviour.
fn report_peer(&mut self, peer_id: PeerId, reason: Severity);
/// Send a packet to a peer.
fn send(&mut self, peer_id: PeerId, data: Vec<u8>) -> Result<(), NetworkError>;
fn send(&mut self, peer_id: PeerId, data: Vec<u8>);
/// Returns peer identifier string
fn peer_info(&self, peer_id: PeerId) -> String {
peer_id.to_string()
@@ -50,15 +48,11 @@ impl<'s> NetSyncIo<'s> {
}
impl<'s> SyncIo for NetSyncIo<'s> {
fn disable_peer(&mut self, peer_id: PeerId, reason: &str) {
self.network.disable_peer(peer_id, reason);
fn report_peer(&mut self, peer_id: PeerId, reason: Severity) {
self.network.report_peer(peer_id, reason);
}
fn disconnect_peer(&mut self, peer_id: PeerId) {
self.network.disconnect_peer(peer_id);
}
fn send(&mut self, peer_id: PeerId, data: Vec<u8>) -> Result<(), NetworkError>{
fn send(&mut self, peer_id: PeerId, data: Vec<u8>) {
self.network.send(peer_id, 0, data)
}
+1 -1
View File
@@ -59,7 +59,7 @@ pub use service::{Service, FetchFuture, ConsensusService, BftMessageStream,
TransactionPool, Params, ManageNetwork, SyncProvider};
pub use protocol::{ProtocolStatus, PeerInfo, Context};
pub use sync::{Status as SyncStatus, SyncState};
pub use network_libp2p::{NonReservedPeerMode, NetworkConfiguration, PeerId, ProtocolId, ConnectionFilter, ConnectionDirection};
pub use network_libp2p::{NonReservedPeerMode, NetworkConfiguration, PeerId, ProtocolId, ConnectionFilter, ConnectionDirection, Severity};
pub use message::{generic as generic_message, RequestId, BftMessage, LocalizedBftMessage, ConsensusVote, SignedConsensusVote, SignedConsensusMessage, SignedConsensusProposal, Status as StatusMessage};
pub use error::Error;
pub use config::{Roles, ProtocolConfig};
+4 -8
View File
@@ -28,7 +28,7 @@ use client;
use client::light::fetcher::{Fetcher, FetchChecker, RemoteCallRequest};
use io::SyncIo;
use message;
use network_libp2p::PeerId;
use network_libp2p::{Severity, PeerId};
use service;
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT};
@@ -137,8 +137,7 @@ impl<B: BlockT, E> OnDemand<B, E> where
let request = match core.remove(peer, request_id) {
Some(request) => request,
None => {
trace!(target: "sync", "Invalid remote {} response from peer {}", rtype, peer);
io.disconnect_peer(peer);
io.report_peer(peer, Severity::Bad(&format!("Invalid remote {} response from peer", rtype)));
core.remove_peer(peer);
return;
},
@@ -147,9 +146,7 @@ impl<B: BlockT, E> OnDemand<B, E> where
let retry_request_data = match try_accept(request) {
Accept::Ok => None,
Accept::CheckFailed(error, retry_request_data) => {
trace!(target: "sync", "Failed to check remote {} response from peer {}: {}", rtype, peer, error);
io.disconnect_peer(peer);
io.report_peer(peer, Severity::Bad(&format!("Failed to check remote {} response from peer: {}", rtype, error)));
core.remove_peer(peer);
Some(retry_request_data)
},
@@ -187,8 +184,7 @@ impl<B, E> OnDemandService<B> for OnDemand<B, E> where
fn maintain_peers(&self, io: &mut SyncIo) {
let mut core = self.core.lock();
for bad_peer in core.maintain_peers() {
trace!(target: "sync", "Remote request timeout for peer {}", bad_peer);
io.disconnect_peer(bad_peer);
io.report_peer(bad_peer, Severity::Timeout);
}
core.dispatch();
}
+15 -31
View File
@@ -21,7 +21,7 @@ use std::time;
use parking_lot::RwLock;
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, Hash, HashFor, As};
use runtime_primitives::generic::BlockId;
use network_libp2p::PeerId;
use network_libp2p::{PeerId, Severity};
use codec::{Encode, Decode};
use message::{self, Message};
@@ -43,7 +43,6 @@ pub (crate) const CURRENT_VERSION: u32 = 1;
/// Current packet count.
pub (crate) const CURRENT_PACKET_COUNT: u8 = 1;
// Maximum allowed entries in `BlockResponse`
const MAX_BLOCK_DATA_RESPONSE: u32 = 128;
@@ -110,11 +109,8 @@ pub trait Context<B: BlockT> {
/// Get a reference to the client.
fn client(&self) -> &::chain::Client<B>;
/// Disable a peer
fn disable_peer(&mut self, peer_id: PeerId, reason: &str);
/// Disconnect peer
fn disconnect_peer(&mut self, peer_id: PeerId);
/// Point out that a peer has been malign or irresponsible or appeared lazy.
fn report_peer(&mut self, peer_id: PeerId, reason: Severity);
/// Get peer info.
fn peer_info(&self, peer: PeerId) -> Option<PeerInfo<B>>;
@@ -142,12 +138,9 @@ impl<'a, B: BlockT + 'a> ProtocolContext<'a, B> {
send_message(&self.context_data.peers, self.io, peer_id, message)
}
pub fn disable_peer(&mut self, peer_id: PeerId, reason: &str) {
self.io.disable_peer(peer_id, reason);
}
pub fn disconnect_peer(&mut self, peer_id: PeerId) {
self.io.disconnect_peer(peer_id)
/// Point out that a peer has been malign or irresponsible or appeared lazy.
pub fn report_peer(&mut self, peer_id: PeerId, reason: Severity) {
self.io.report_peer(peer_id, reason);
}
/// Get peer info.
@@ -168,12 +161,8 @@ impl<'a, B: BlockT + 'a> Context<B> for ProtocolContext<'a, B> {
ProtocolContext::send_message(self, peer_id, message);
}
fn disable_peer(&mut self, peer_id: PeerId, reason: &str) {
ProtocolContext::disable_peer(self, peer_id, reason);
}
fn disconnect_peer(&mut self, peer_id: PeerId) {
ProtocolContext::disconnect_peer(self, peer_id);
fn report_peer(&mut self, peer_id: PeerId, reason: Severity) {
ProtocolContext::report_peer(self, peer_id, reason);
}
fn peer_info(&self, peer_id: PeerId) -> Option<PeerInfo<B>> {
@@ -244,7 +233,7 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> {
Some(m) => m,
None => {
trace!(target: "sync", "Invalid packet from {}", peer_id);
io.disable_peer(peer_id, "Peer sent us a packet with invalid format");
io.report_peer(peer_id, Severity::Bad("Peer sent us a packet with invalid format"));
return;
}
};
@@ -260,12 +249,12 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> {
match mem::replace(&mut peer.block_request, None) {
Some(r) => r,
None => {
io.disable_peer(peer_id, "Unexpected response packet received from peer");
io.report_peer(peer_id, Severity::Bad("Unexpected response packet received from peer"));
return;
}
}
} else {
io.disable_peer(peer_id, "Unexpected packet received from peer");
io.report_peer(peer_id, Severity::Bad("Unexpected packet received from peer"));
return;
}
};
@@ -385,7 +374,6 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> {
.chain(handshaking_peers.iter()) {
if (tick - *timestamp).as_secs() > REQUEST_TIMEOUT_SEC {
trace!(target: "sync", "Timeout {}", peer_id);
io.disconnect_peer(*peer_id);
aborting.push(*peer_id);
}
}
@@ -393,7 +381,7 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> {
self.specialization.write().maintain_peers(&mut ProtocolContext::new(&self.context_data, io));
for p in aborting {
io.disconnect_peer(p);
io.report_peer(p, Severity::Timeout);
}
}
@@ -424,11 +412,11 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> {
return;
}
if status.genesis_hash != self.genesis_hash {
io.disable_peer(peer_id, &format!("Peer is on different chain (our genesis: {} theirs: {})", self.genesis_hash, status.genesis_hash));
io.report_peer(peer_id, Severity::Bad(&format!("Peer is on different chain (our genesis: {} theirs: {})", self.genesis_hash, status.genesis_hash)));
return;
}
if status.version != CURRENT_VERSION {
io.disable_peer(peer_id, &format!("Peer using unsupported protocol version {}", status.version));
io.report_peer(peer_id, Severity::Bad(&format!("Peer using unsupported protocol version {}", status.version)));
return;
}
@@ -625,11 +613,7 @@ fn send_message<B: BlockT>(peers: &RwLock<HashMap<PeerId, Peer<B>>>, io: &mut Sy
},
_ => (),
}
let data = message.encode();
if let Err(e) = io.send(peer_id, data) {
debug!(target:"sync", "Error sending message: {:?}", e);
io.disconnect_peer(peer_id);
}
io.send(peer_id, message.encode());
}
/// Hash a message.
+7 -8
View File
@@ -17,7 +17,7 @@
use std::collections::HashMap;
use std::sync::Arc;
use protocol::Context;
use network_libp2p::PeerId;
use network_libp2p::{Severity, PeerId};
use client::{BlockStatus, BlockOrigin, ClientInfo};
use client::error::Error as ClientError;
use blocks::{self, BlockCollection};
@@ -124,13 +124,13 @@ impl<B: BlockT> ChainSync<B> {
match (block_status(&*protocol.client(), &*self.import_queue, info.best_hash), info.best_number) {
(Err(e), _) => {
debug!(target:"sync", "Error reading blockchain: {:?}", e);
protocol.disconnect_peer(peer_id);
protocol.report_peer(peer_id, Severity::Useless(&format!("Error legimimately reading blockchain status: {:?}", e)));
},
(Ok(BlockStatus::KnownBad), _) => {
protocol.disable_peer(peer_id, &format!("New peer with known bad best block {} ({}).", info.best_hash, info.best_number));
protocol.report_peer(peer_id, Severity::Bad(&format!("New peer with known bad best block {} ({}).", info.best_hash, info.best_number)));
},
(Ok(BlockStatus::Unknown), b) if b == As::sa(0) => {
protocol.disable_peer(peer_id, &format!("New peer with unknown genesis hash {} ({}).", info.best_hash, info.best_number));
protocol.report_peer(peer_id, Severity::Bad(&format!("New peer with unknown genesis hash {} ({}).", info.best_hash, info.best_number)));
},
(Ok(BlockStatus::Unknown), _) => {
let our_best = self.best_queued_number;
@@ -211,19 +211,18 @@ impl<B: BlockT> ChainSync<B> {
},
Ok(_) => { // genesis mismatch
trace!(target:"sync", "Ancestry search: genesis mismatch for peer {}", peer_id);
protocol.disable_peer(peer_id, "Ancestry search: genesis mismatch for peer");
protocol.report_peer(peer_id, Severity::Bad("Ancestry search: genesis mismatch for peer"));
return;
},
Err(e) => {
debug!(target:"sync", "Error reading blockchain: {:?}", e);
protocol.disconnect_peer(peer_id);
protocol.report_peer(peer_id, Severity::Useless(&format!("Error answering legitimate blockchain query: {:?}", e)));
return;
}
}
},
None => {
trace!(target:"sync", "Invalid response when searching for ancestor from {}", peer_id);
protocol.disconnect_peer(peer_id);
protocol.report_peer(peer_id, Severity::Bad("Invalid response when searching for ancestor"));
return;
}
}
+3 -8
View File
@@ -28,7 +28,7 @@ use io::SyncIo;
use protocol::{Context, Protocol};
use config::ProtocolConfig;
use service::TransactionPool;
use network_libp2p::{PeerId, SessionInfo, Error as NetworkError};
use network_libp2p::{PeerId, SessionInfo, Severity};
use keyring::Keyring;
use codec::Encode;
use import_queue::tests::SyncImportQueue;
@@ -81,11 +81,7 @@ impl<'p> Drop for TestIo<'p> {
}
impl<'p> SyncIo for TestIo<'p> {
fn disable_peer(&mut self, peer_id: PeerId, _reason: &str) {
self.disconnect_peer(peer_id);
}
fn disconnect_peer(&mut self, peer_id: PeerId) {
fn report_peer(&mut self, peer_id: PeerId, _reason: Severity) {
self.to_disconnect.insert(peer_id);
}
@@ -93,12 +89,11 @@ impl<'p> SyncIo for TestIo<'p> {
false
}
fn send(&mut self, peer_id: PeerId, data: Vec<u8>) -> Result<(), NetworkError> {
fn send(&mut self, peer_id: PeerId, data: Vec<u8>) {
self.packets.push(TestPacket {
data: data,
recipient: peer_id,
});
Ok(())
}
fn peer_info(&self, peer_id: PeerId) -> String {