More networking fixes (#384)

* Don't connect to ourselves for Kademlia

* Improve log messages

* Manually drop connections

* Kademlia is now 32 seconds

* Fix potential panic

* Fix other potential panic

* No longer pass the endpoint everywhere
This commit is contained in:
Pierre Krieger
2018-07-19 17:56:39 +02:00
committed by Gav Wood
parent ed7144d281
commit 0fbf66b07e
6 changed files with 171 additions and 152 deletions
@@ -11,7 +11,7 @@ bytes = "0.4"
error-chain = { version = "0.12", default-features = false }
fnv = "1.0"
futures = "0.1"
libp2p = { git = "https://github.com/tomaka/libp2p-rs", rev = "38c9238a74504a6149909eb80f73608f63ab46c7", default-features = false, features = ["libp2p-secio", "libp2p-secio-secp256k1"] }
libp2p = { git = "https://github.com/tomaka/libp2p-rs", rev = "ccbb4361c299491b40e5ca95cfa6716ba31c485c", default-features = false, features = ["libp2p-secio", "libp2p-secio-secp256k1"] }
ethcore-io = { git = "https://github.com/paritytech/parity.git" }
ethkey = { git = "https://github.com/paritytech/parity.git" }
ethereum-types = "0.3"
@@ -52,6 +52,9 @@ pub struct RegisteredProtocolOutput<T> {
/// Id of the protocol.
pub protocol_id: ProtocolId,
/// Endpoint of the connection.
pub endpoint: Endpoint,
/// Version of the protocol that was negotiated.
pub protocol_version: u8,
@@ -123,7 +126,7 @@ where C: AsyncRead + AsyncWrite + 'static, // TODO: 'static :-/
self,
socket: C,
protocol_version: Self::UpgradeIdentifier,
_endpoint: Endpoint,
endpoint: Endpoint,
remote_addr: Maf
) -> Self::Future {
let packet_count = self.supported_versions
@@ -214,6 +217,7 @@ where C: AsyncRead + AsyncWrite + 'static, // TODO: 'static :-/
let out = RegisteredProtocolOutput {
custom_data: self.custom_data,
protocol_id: self.id,
endpoint,
protocol_version: protocol_version,
outgoing: msg_tx,
incoming: Box::new(incoming),
@@ -323,6 +323,8 @@ impl NetworkState {
/// Adds an address discovered by Kademlia.
/// Note that we don't have to be connected to a peer to add an address.
pub fn add_kad_discovered_addr(&self, node_id: &PeerstorePeerId, addr: Multiaddr) {
trace!(target: "sub-libp2p", "Peer store: adding address {} for {:?}",
addr, node_id);
match self.peerstore {
PeersStorage::Memory(ref mem) =>
mem.peer_or_create(node_id)
@@ -495,6 +497,8 @@ impl NetworkState {
!infos.protocols.iter().any(|(_, conn)| conn.is_alive())
{
peer_by_nodeid.remove(&infos.id);
trace!(target: "sub-libp2p", "Cleaning up expired peer \
#{:?} ({:?})", peer_id, infos.id);
return false;
}
@@ -593,6 +597,14 @@ impl NetworkState {
pub fn disconnect_peer(&self, peer_id: PeerId) {
let mut connections = self.connections.write();
if let Some(peer_info) = connections.info_by_peer.remove(&peer_id) {
trace!(target: "sub-libp2p", "Destroying peer #{} {:?} ; \
kademlia = {:?} ; num_protos = {:?}", peer_id, peer_info.id,
peer_info.kad_connec.is_alive(),
peer_info.protocols.iter().filter(|c| c.1.is_alive()).count());
// TODO: we manually clear the connections as a work-around for
// networking bugs ; normally it should automatically drop
for c in peer_info.protocols.iter() { c.1.clear(); }
peer_info.kad_connec.clear();
let old = connections.peer_by_nodeid.remove(&peer_info.id);
debug_assert_eq!(old, Some(peer_id));
}
@@ -617,6 +629,7 @@ impl NetworkState {
/// of `custom_proto`).
pub fn disable_peer(&self, peer_id: PeerId, reason: &str) {
// TODO: what do we do if the peer is reserved?
// TODO: same logging as in disconnect_peer
let mut connections = self.connections.write();
let peer_info = if let Some(peer_info) = connections.info_by_peer.remove(&peer_id) {
if let (&Some(ref client_version), &Some(ref remote_address)) = (&peer_info.client_version, &peer_info.remote_address) {
@@ -686,6 +699,8 @@ fn accept_connection(
let peer_id = *peer_by_nodeid.entry(node_id.clone()).or_insert_with(|| {
let new_id = next_peer_id.fetch_add(1, atomic::Ordering::Relaxed);
trace!(target: "sub-libp2p", "Creating new peer #{:?} for {:?}",
new_id, node_id);
info_by_peer.insert(new_id, PeerConnectionInfo {
protocols: Vec::new(), // TODO: Vec::with_capacity(num_registered_protocols),
@@ -451,14 +451,13 @@ fn init_thread(
upgrade::map_with_addr(DelayedProtosList(shared), |c, a| FinalUpgrade::Custom(c, a.clone())));
upgrade::apply(out.socket, listener_upgrade, endpoint, client_addr)
}
})
.map(|out, _| (out, Endpoint::Listener));
});
let shared = shared.clone();
libp2p::core::swarm(
upgraded_transport,
move |(upgrade, endpoint), _client_addr|
listener_handle(shared.clone(), upgrade, endpoint)
move |upgrade, _client_addr|
listener_handle(shared.clone(), upgrade)
)
};
@@ -566,13 +565,12 @@ enum FinalUpgrade<C> {
fn listener_handle<'a, C>(
shared: Arc<Shared>,
upgrade: FinalUpgrade<C>,
endpoint: Endpoint,
) -> Box<Future<Item = (), Error = IoError> + 'a>
where C: AsyncRead + AsyncWrite + 'a {
match upgrade {
FinalUpgrade::Kad(controller, kademlia_stream, client_addr) => {
trace!(target: "sub-libp2p", "Opened kademlia substream with \
{:?} as {:?}", client_addr, endpoint);
trace!(target: "sub-libp2p", "Opened kademlia substream with {:?}",
client_addr);
match handle_kademlia_connection(shared, client_addr, controller, kademlia_stream) {
Ok(fut) => Box::new(fut) as Box<_>,
Err(err) => Box::new(future::err(err)) as Box<_>,
@@ -614,7 +612,7 @@ fn listener_handle<'a, C>(
FinalUpgrade::Custom(custom_proto_out, client_addr) => {
// A "custom" protocol is one that is part of substrate and not part of libp2p.
let shared = shared.clone();
let fut = handle_custom_connection(shared, client_addr, endpoint, custom_proto_out);
let fut = handle_custom_connection(shared, client_addr, custom_proto_out);
Box::new(fut) as Box<_>
},
}
@@ -628,9 +626,10 @@ fn handle_kademlia_connection(
kademlia_stream: Box<Stream<Item = KadIncomingRequest, Error = IoError>>
) -> Result<impl Future<Item = (), Error = IoError>, IoError> {
let node_id = p2p_multiaddr_to_node_id(client_addr);
let (_peer_id, kad_connec) = shared.network_state
let (peer_id, kad_connec) = shared.network_state
.kad_connection(node_id.clone())?;
let node_id2 = node_id.clone();
let future = future::loop_fn(kademlia_stream, move |kademlia_stream| {
let shared = shared.clone();
let node_id = node_id.clone();
@@ -655,6 +654,10 @@ fn handle_kademlia_connection(
}
Ok(future::Loop::Continue(rest))
})
}).then(move |val| {
trace!(target: "sub-libp2p", "Closed Kademlia connection \
with #{} {:?} => {:?}", peer_id, node_id2, val);
val
});
Ok(kad_connec.set_until(controller, future))
@@ -708,7 +711,6 @@ fn build_kademlia_response(
fn handle_custom_connection(
shared: Arc<Shared>,
client_addr: Multiaddr,
endpoint: Endpoint,
custom_proto_out: RegisteredProtocolOutput<Arc<NetworkProtocolHandler + Send + Sync>>
) -> impl Future<Item = (), Error = IoError> {
let handler = custom_proto_out.custom_data;
@@ -726,7 +728,7 @@ fn handle_custom_connection(
let (peer_id, unique_connec) = match shared.network_state.custom_proto(
node_id.clone(),
protocol_id,
endpoint,
custom_proto_out.endpoint,
) {
Ok(a) => a,
Err(err) => return future::Either::A(future::err(err.into())),
@@ -834,7 +836,7 @@ fn start_kademlia_discovery<T, To, St, C>(shared: Arc<Shared>, transport: T,
where T: MuxedTransport<Output = TransportOutput<To>> + Clone + 'static,
T::MultiaddrFuture: 'static,
To: AsyncRead + AsyncWrite + 'static,
St: MuxedTransport<Output = (FinalUpgrade<C>, Endpoint)> + Clone + 'static,
St: MuxedTransport<Output = FinalUpgrade<C>> + Clone + 'static,
C: 'static {
let kad_init = shared.kad_system.perform_initialization({
let shared = shared.clone();
@@ -849,7 +851,7 @@ fn start_kademlia_discovery<T, To, St, C>(shared: Arc<Shared>, transport: T,
)
});
let discovery = Interval::new(Instant::now(), Duration::from_secs(30))
let discovery = Interval::new(Instant::now(), Duration::from_secs(32))
// TODO: add a timeout to the lookups
.map_err(|err| IoError::new(IoErrorKind::Other, err))
.and_then({
@@ -901,7 +903,7 @@ fn perform_kademlia_query<T, To, St, C>(
where T: MuxedTransport<Output = TransportOutput<To>> + Clone + 'static,
T::MultiaddrFuture: 'static,
To: AsyncRead + AsyncWrite + 'static,
St: MuxedTransport<Output = (FinalUpgrade<C>, Endpoint)> + Clone + 'static,
St: MuxedTransport<Output = FinalUpgrade<C>> + Clone + 'static,
C: 'static {
// Query the node IDs that are closest to a random ID.
// Note that the randomness doesn't have to be secure, as this only
@@ -925,8 +927,6 @@ fn perform_kademlia_query<T, To, St, C>(
match event {
KadQueryEvent::NewKnownMultiaddrs(peers) => {
for (peer, addrs) in peers {
trace!(target: "sub-libp2p", "Peer store: adding \
addresses {:?} for {:?}", addrs, peer);
for addr in addrs {
shared.network_state.add_kad_discovered_addr(&peer, addr);
}
@@ -950,11 +950,11 @@ fn connect_to_nodes<T, To, St, C>(
where T: MuxedTransport<Output = TransportOutput<To>> + Clone + 'static,
T::MultiaddrFuture: 'static,
To: AsyncRead + AsyncWrite + 'static,
St: MuxedTransport<Output = (FinalUpgrade<C>, Endpoint)> + Clone + 'static,
St: MuxedTransport<Output = FinalUpgrade<C>> + Clone + 'static,
C: 'static {
let num_slots = shared.network_state.should_open_outgoing_custom_connections();
debug!(target: "sub-libp2p", "Opening up to {} outgoing connections",
num_slots);
debug!(target: "sub-libp2p", "Outgoing connections cycle ; opening up to \
{} outgoing connections", num_slots);
for _ in 0 .. num_slots {
// Choose a random peer. We are potentially already connected to
@@ -970,7 +970,6 @@ fn connect_to_nodes<T, To, St, C>(
// Try to dial that node for each registered protocol. Since dialing
// upgrades the connection to use multiplexing, dialing multiple times
// should automatically open multiple substreams.
trace!(target: "sub-libp2p", "Ensuring connection to {:?}", peer);
for proto in shared.protocols.read().0.clone().into_iter() {
open_peer_custom_proto(
shared.clone(),
@@ -997,10 +996,11 @@ fn open_peer_custom_proto<T, To, St, C>(
where T: MuxedTransport<Output = TransportOutput<To>> + Clone + 'static,
T::MultiaddrFuture: 'static,
To: AsyncRead + AsyncWrite + 'static,
St: MuxedTransport<Output = (FinalUpgrade<C>, Endpoint)> + Clone + 'static,
St: MuxedTransport<Output = FinalUpgrade<C>> + Clone + 'static,
C: 'static,
{
// Don't connect to ourselves.
// TODO: remove this eventually
if &expected_peer_id == shared.kad_system.local_peer_id() {
return
}
@@ -1011,7 +1011,7 @@ fn open_peer_custom_proto<T, To, St, C>(
}
let proto_id = proto.id();
let peer_id = expected_peer_id.clone();
let node_id = expected_peer_id.clone();
let shared2 = shared.clone();
let addr: Multiaddr = AddrComponent::P2P(expected_peer_id.clone().into_bytes()).into();
@@ -1039,10 +1039,10 @@ fn open_peer_custom_proto<T, To, St, C>(
upgrade::apply(socket, proto, endpoint, client_addr)
)
})
.and_then(move |out, endpoint, client_addr|
.and_then(move |out, _endpoint, client_addr|
client_addr.map(move |client_addr| {
let out = FinalUpgrade::Custom(out, client_addr.clone());
((out, endpoint), future::ok(client_addr))
(out, future::ok(client_addr))
})
);
@@ -1050,17 +1050,23 @@ fn open_peer_custom_proto<T, To, St, C>(
Duration::from_secs(20));
let with_err = with_timeout
.map_err({
let peer_id = peer_id.clone();
let node_id = node_id.clone();
move |err| {
debug!(target: "sub-libp2p", "Error while dialing \
{:?} with custom proto: {:?}", peer_id, err);
{:?} with custom proto: {:?}", node_id, err);
err
}
});
if let Ok(unique_connec) = shared2.network_state
.custom_proto(peer_id, proto_id, Endpoint::Dialer) {
let _ = unique_connec.1.get_or_dial(&swarm_controller, &addr, with_err);
if let Ok((peer_id, unique_connec)) = shared2.network_state
.custom_proto(node_id.clone(), proto_id, Endpoint::Dialer) {
if !unique_connec.is_alive() {
trace!(target: "sub-libp2p", "Opening connection to #{} {:?} with \
proto {:?}", peer_id, node_id, proto_id);
}
// TODO: this future should be used
let _ = unique_connec.get_or_dial(&swarm_controller, &addr, with_err);
}
}
@@ -1071,7 +1077,7 @@ fn obtain_kad_connection<T, To, St, C>(shared: Arc<Shared>,
where T: MuxedTransport<Output = TransportOutput<To>> + Clone + 'static,
T::MultiaddrFuture: 'static,
To: AsyncRead + AsyncWrite + 'static,
St: MuxedTransport<Output = (FinalUpgrade<C>, Endpoint)> + Clone + 'static,
St: MuxedTransport<Output = FinalUpgrade<C>> + Clone + 'static,
C: 'static {
let kad_upgrade = shared.kad_upgrade.clone();
let addr: Multiaddr = AddrComponent::P2P(peer_id.clone().into_bytes()).into();
@@ -1083,7 +1089,7 @@ fn obtain_kad_connection<T, To, St, C>(shared: Arc<Shared>,
.and_then(move |(ctrl, fut), _, client_addr| {
client_addr.map(|client_addr| {
let out = FinalUpgrade::Kad(ctrl, fut, client_addr.clone());
((out, Endpoint::Dialer), future::ok(client_addr))
(out, future::ok(client_addr))
})
});
@@ -1130,8 +1136,6 @@ fn process_identify_info(
}
for addr in info.info.listen_addrs.iter() {
trace!(target: "sub-libp2p", "Peer store: adding address {} for {:?}",
addr, node_id);
shared.network_state.add_kad_discovered_addr(&node_id, addr.clone());
}
@@ -1148,7 +1152,7 @@ fn start_pinger<T, To, St, C>(
where T: MuxedTransport<Output = TransportOutput<To>> + Clone + 'static,
T::MultiaddrFuture: 'static,
To: AsyncRead + AsyncWrite + 'static,
St: MuxedTransport<Output = (FinalUpgrade<C>, Endpoint)> + Clone + 'static,
St: MuxedTransport<Output = FinalUpgrade<C>> + Clone + 'static,
C: 'static {
let transport = transport
.and_then(move |out, endpoint, client_addr|
@@ -1157,7 +1161,7 @@ fn start_pinger<T, To, St, C>(
.and_then(move |(ctrl, fut), _, client_addr| {
client_addr.map(|client_addr| {
let out = FinalUpgrade::Ping(ctrl, fut, client_addr.clone());
((out, Endpoint::Dialer), future::ok(client_addr))
(out, future::ok(client_addr))
})
});
@@ -1182,9 +1186,9 @@ fn ping_all<T, St, C>(
transport: T,
swarm_controller: &SwarmController<St>
) -> impl Future<Item = (), Error = IoError>
where T: MuxedTransport<Output = (FinalUpgrade<C>, Endpoint)> + Clone + 'static,
where T: MuxedTransport<Output = FinalUpgrade<C>> + Clone + 'static,
T::MultiaddrFuture: 'static,
St: MuxedTransport<Output = (FinalUpgrade<C>, Endpoint)> + Clone + 'static,
St: MuxedTransport<Output = FinalUpgrade<C>> + Clone + 'static,
C: 'static {
let mut ping_futures = Vec::new();
@@ -1196,7 +1200,7 @@ fn ping_all<T, St, C>(
.get_or_dial(&swarm_controller, &addr, transport.clone())
.and_then(move |mut p| {
trace!(target: "sub-libp2p",
"Pinging active connection with #{} {:?}", peer, peer_id);
"Pinging peer #{} aka. {:?}", peer, peer_id);
p.ping()
.map(|()| peer_id)
.map_err(|err| IoError::new(IoErrorKind::Other, err))
@@ -56,10 +56,6 @@ pub fn build_transport(
.into_connection_reuse();
TransportTimeout::new(base, Duration::from_secs(20))
.map_err(|err| {
debug!(target: "sub-libp2p", "Error in base transport layer: {:?}", err);
err
})
}
/// Specifies whether unencrypted communications are allowed or denied.