libp2p-next (#3076)

* Changes for the next libp2p release:

  * Updates to the Kademlia APIs.
  * Updated imports due to the extracted libp2p-swarm crate.
  * ...

Still pending at least the following:

  * rust-libp2p/#1189
  * rust-libp2p/#1191
  * rust-libp2p/#1194

* Use Quorum::One.

The previous choice was apparently arbitrary.

* Use libp2p-0.11 from crates.io. Address feedback.

* Correct imports after merge.
This commit is contained in:
Roman Borschel
2019-07-24 17:32:25 +02:00
committed by Pierre Krieger
parent 5d58d583e3
commit 343f4a2a50
14 changed files with 264 additions and 202 deletions
+120 -74
View File
@@ -48,18 +48,21 @@
use futures::prelude::*;
use futures_timer::Delay;
use futures03::{compat::Compat, TryFutureExt as _};
use libp2p::core::{Multiaddr, PeerId, ProtocolsHandler, PublicKey};
use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction};
use libp2p::core::swarm::PollParameters;
use libp2p::core::{ConnectedPoint, Multiaddr, PeerId, PublicKey};
use libp2p::swarm::{ProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use libp2p::kad::{Kademlia, KademliaEvent, Quorum, Record};
use libp2p::kad::GetClosestPeersError;
use libp2p::kad::record::store::MemoryStore;
#[cfg(not(target_os = "unknown"))]
use libp2p::core::{swarm::toggle::Toggle, nodes::Substream, muxing::StreamMuxerBox};
use libp2p::kad::{GetValueResult, Kademlia, KademliaOut, PutValueResult};
use libp2p::{swarm::toggle::Toggle};
#[cfg(not(target_os = "unknown"))]
use libp2p::core::{nodes::Substream, muxing::StreamMuxerBox};
#[cfg(not(target_os = "unknown"))]
use libp2p::mdns::{Mdns, MdnsEvent};
use libp2p::multihash::Multihash;
use libp2p::multiaddr::Protocol;
use log::{debug, info, trace, warn};
use std::{cmp, collections::VecDeque, num::NonZeroU8, time::Duration};
use std::{cmp, collections::VecDeque, time::Duration};
use tokio_io::{AsyncRead, AsyncWrite};
/// Implementation of `NetworkBehaviour` that discovers the nodes on the network.
@@ -68,7 +71,7 @@ pub struct DiscoveryBehaviour<TSubstream> {
/// reserved nodes.
user_defined: Vec<(PeerId, Multiaddr)>,
/// Kademlia requests and answers.
kademlia: Kademlia<TSubstream>,
kademlia: Kademlia<TSubstream, MemoryStore>,
/// Discovers nodes on the local network.
#[cfg(not(target_os = "unknown"))]
mdns: Toggle<Mdns<Substream<StreamMuxerBox>>>,
@@ -98,7 +101,9 @@ impl<TSubstream> DiscoveryBehaviour<TSubstream> {
warn!(target: "sub-libp2p", "mDNS is not available on this platform");
}
let mut kademlia = Kademlia::new(local_public_key.clone().into_peer_id());
let local_id = local_public_key.clone().into_peer_id();
let store = MemoryStore::new(local_id.clone());
let mut kademlia = Kademlia::new(local_id.clone(), store);
for (peer_id, addr) in &user_defined {
kademlia.add_address(peer_id, addr.clone());
}
@@ -155,8 +160,7 @@ impl<TSubstream> DiscoveryBehaviour<TSubstream> {
///
/// A corresponding `ValueFound` or `ValueNotFound` event will later be generated.
pub fn get_value(&mut self, key: &Multihash) {
self.kademlia.get_value(key, NonZeroU8::new(10)
.expect("Casting 10 to NonZeroU8 should succeed; qed"));
self.kademlia.get_record(key, Quorum::One)
}
/// Start putting a record into the DHT. Other nodes can later fetch that value with
@@ -164,15 +168,24 @@ impl<TSubstream> DiscoveryBehaviour<TSubstream> {
///
/// A corresponding `ValuePut` or `ValuePutFailed` event will later be generated.
pub fn put_value(&mut self, key: Multihash, value: Vec<u8>) {
self.kademlia.put_value(key, value);
self.kademlia.put_record(Record::new(key, value), Quorum::All);
}
}
/// Event generated by the `DiscoveryBehaviour`.
pub enum DiscoveryOut {
/// We have discovered a node. Can be called multiple times with the same identity.
/// The address of a peer has been added to the Kademlia routing table.
///
/// Can be called multiple times with the same identity.
Discovered(PeerId),
/// A peer connected to this node for whom no listen address is known.
///
/// In order for the peer to be added to the Kademlia routing table, a known
/// listen address must be added via [`DiscoveryBehaviour::add_self_reported_address`],
/// e.g. obtained through the `identify` protocol.
UnroutablePeer(PeerId),
/// The DHT yeided results for the record request, grouped in (key, value) pairs.
ValueFound(Vec<(Multihash, Vec<u8>)>),
@@ -190,7 +203,7 @@ impl<TSubstream> NetworkBehaviour for DiscoveryBehaviour<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
type ProtocolsHandler = <Kademlia<TSubstream> as NetworkBehaviour>::ProtocolsHandler;
type ProtocolsHandler = <Kademlia<TSubstream, MemoryStore> as NetworkBehaviour>::ProtocolsHandler;
type OutEvent = DiscoveryOut;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
@@ -272,9 +285,11 @@ where
let random_peer_id = PeerId::random();
debug!(target: "sub-libp2p", "Libp2p <= Starting random Kademlia request for \
{:?}", random_peer_id);
self.kademlia.find_node(random_peer_id);
// Reset the `Delay` to the next random.
self.kademlia.get_closest_peers(random_peer_id);
// Schedule the next random query with exponentially increasing delay,
// capped at 60 seconds.
self.next_kad_random_query = Delay::new(self.duration_to_next_kad).compat();
self.duration_to_next_kad = cmp::min(self.duration_to_next_kad * 2,
Duration::from_secs(60));
@@ -290,50 +305,74 @@ where
loop {
match self.kademlia.poll(params) {
Async::NotReady => break,
Async::Ready(NetworkBehaviourAction::GenerateEvent(ev)) => {
match ev {
KademliaOut::Discovered { .. } => {}
KademliaOut::KBucketAdded { peer_id, .. } => {
let ev = DiscoveryOut::Discovered(peer_id);
return Async::Ready(NetworkBehaviourAction::GenerateEvent(ev));
}
KademliaOut::FindNodeResult { key, closer_peers } => {
trace!(target: "sub-libp2p", "Libp2p => Query for {:?} yielded {:?} results",
key, closer_peers.len());
if closer_peers.is_empty() && self.num_connections != 0 {
warn!(target: "sub-libp2p", "Libp2p => Random Kademlia query has yielded empty \
results");
Async::Ready(NetworkBehaviourAction::GenerateEvent(ev)) => match ev {
KademliaEvent::UnroutablePeer { peer, .. } => {
let ev = DiscoveryOut::UnroutablePeer(peer);
return Async::Ready(NetworkBehaviourAction::GenerateEvent(ev));
}
KademliaEvent::RoutingUpdated { peer, .. } => {
let ev = DiscoveryOut::Discovered(peer);
return Async::Ready(NetworkBehaviourAction::GenerateEvent(ev));
}
KademliaEvent::GetClosestPeersResult(res) => {
match res {
Err(GetClosestPeersError::Timeout { key, peers }) => {
warn!(target: "sub-libp2p",
"Libp2p => Query for {:?} timed out with {:?} results",
key, peers.len());
},
Ok(ok) => {
trace!(target: "sub-libp2p",
"Libp2p => Query for {:?} yielded {:?} results",
ok.key, ok.peers.len());
if ok.peers.is_empty() && self.num_connections != 0 {
warn!(target: "sub-libp2p", "Libp2p => Random Kademlia query has yielded empty \
results");
}
}
}
KademliaOut::GetValueResult(res) => {
let ev = match res {
GetValueResult::Found { results } => {
let results = results
.into_iter()
.map(|r| (r.key, r.value))
.collect();
}
KademliaEvent::GetRecordResult(res) => {
let ev = match res {
Ok(ok) => {
let results = ok.records
.into_iter()
.map(|r| (r.key, r.value))
.collect();
DiscoveryOut::ValueFound(results)
}
GetValueResult::NotFound { key, .. } => {
DiscoveryOut::ValueNotFound(key)
}
};
return Async::Ready(NetworkBehaviourAction::GenerateEvent(ev));
DiscoveryOut::ValueFound(results)
}
Err(e) => {
DiscoveryOut::ValueNotFound(e.into_key())
}
};
return Async::Ready(NetworkBehaviourAction::GenerateEvent(ev));
}
KademliaEvent::PutRecordResult(res) => {
let ev = match res {
Ok(ok) => DiscoveryOut::ValuePut(ok.key),
Err(e) => {
DiscoveryOut::ValuePutFailed(e.into_key())
}
};
return Async::Ready(NetworkBehaviourAction::GenerateEvent(ev));
}
KademliaEvent::RepublishRecordResult(res) => {
match res {
Ok(ok) => debug!(target: "sub-libp2p",
"Libp2p => Record republished: {:?}",
ok.key),
Err(e) => warn!(target: "sub-libp2p",
"Libp2p => Republishing of record {:?} failed with: {:?}",
e.key(), e)
}
KademliaOut::PutValueResult(res) => {
let ev = match res {
PutValueResult::Ok{ key, .. } => {
DiscoveryOut::ValuePut(key)
}
PutValueResult::Err { key, .. } => {
DiscoveryOut::ValuePutFailed(key)
}
};
return Async::Ready(NetworkBehaviourAction::GenerateEvent(ev));
}
// We never start any other type of query.
KademliaOut::GetProvidersResult { .. } => {}
}
KademliaEvent::Discovered { .. } => {
// We are not interested in these events at the moment.
}
// We never start any other type of query.
e => {
warn!(target: "sub-libp2p", "Libp2p => Unhandled Kademlia event: {:?}", e)
}
},
Async::Ready(NetworkBehaviourAction::DialAddress { address }) =>
@@ -384,9 +423,10 @@ mod tests {
use futures::prelude::*;
use libp2p::identity::Keypair;
use libp2p::Multiaddr;
use libp2p::core::{upgrade, Swarm};
use libp2p::core::upgrade;
use libp2p::core::transport::{Transport, MemoryTransport};
use libp2p::core::upgrade::{InboundUpgradeExt, OutboundUpgradeExt};
use libp2p::swarm::Swarm;
use std::collections::HashSet;
use super::{DiscoveryBehaviour, DiscoveryOut};
@@ -428,28 +468,34 @@ mod tests {
.collect::<HashSet<_>>()
}).collect::<Vec<_>>();
let fut = futures::future::poll_fn(move || -> Result<_, ()> {
loop {
let mut keep_polling = false;
let fut = futures::future::poll_fn::<_, (), _>(move || {
'polling: loop {
for swarm_n in 0..swarms.len() {
if let Async::Ready(Some(DiscoveryOut::Discovered(other))) =
swarms[swarm_n].0.poll().unwrap() {
if to_discover[swarm_n].remove(&other) {
keep_polling = true;
// Call `add_self_reported_address` to simulate identify happening.
let addr = swarms.iter()
.find(|s| *Swarm::local_peer_id(&s.0) == other)
.unwrap()
.1.clone();
swarms[swarm_n].0.add_self_reported_address(&other, addr);
match swarms[swarm_n].0.poll().unwrap() {
Async::Ready(Some(e)) => {
match e {
DiscoveryOut::UnroutablePeer(other) => {
// Call `add_self_reported_address` to simulate identify happening.
let addr = swarms.iter().find_map(|(s, a)|
if s.local_peer_id == other {
Some(a.clone())
} else {
None
})
.unwrap();
swarms[swarm_n].0.add_self_reported_address(&other, addr);
},
DiscoveryOut::Discovered(other) => {
to_discover[swarm_n].remove(&other);
}
_ => {}
}
continue 'polling
}
_ => {}
}
}
if !keep_polling {
break;
}
break
}
if to_discover.iter().all(|l| l.is_empty()) {