Allow bootnodes as IP:PORT, query Peer-id (#386)

* Simplify cli parsing: remove unnecessary match for Version and help: e.exit() manages both for us already
* Allow passing of IP:PORT as bootnodes, then try to discover its PEER_ID
This commit is contained in:
Benjamin Kampmann
2018-07-20 00:43:31 +02:00
committed by GitHub
parent 831810b155
commit e3268de487
3 changed files with 112 additions and 40 deletions
+5 -8
View File
@@ -171,14 +171,11 @@ pub fn run<I, T, W>(args: I, worker: W) -> error::Result<()> where
panic_hook::set();
let yaml = load_yaml!("./cli.yml");
let matches = match clap::App::from_yaml(yaml).version(&(crate_version!().to_owned() + "\n")[..]).get_matches_from_safe(args) {
Ok(m) => m,
Err(ref e) if e.kind == clap::ErrorKind::VersionDisplayed => return Ok(()),
Err(ref e) if e.kind == clap::ErrorKind::HelpDisplayed => {
print!("{}", e);
return Ok(())
}
Err(e) => e.exit(),
let matches = match clap::App::from_yaml(yaml)
.version(&(crate_version!().to_owned() + "\n")[..])
.get_matches_from_safe(args) {
Ok(m) => m,
Err(e) => e.exit(),
};
// TODO [ToDr] Split parameters parsing from actual execution.
@@ -17,7 +17,7 @@
use bytes::Bytes;
use fnv::{FnvHashMap, FnvHashSet};
use futures::sync::mpsc;
use libp2p::core::{Multiaddr, AddrComponent, Endpoint, UniqueConnec};
use libp2p::core::{multiaddr::ToMultiaddr, Multiaddr, AddrComponent, Endpoint, UniqueConnec};
use libp2p::core::{UniqueConnecState, PeerId as PeerstorePeerId, PublicKey};
use libp2p::kad::KadConnecController;
use libp2p::peerstore::{Peerstore, PeerAccess};
@@ -150,10 +150,6 @@ impl NetworkState {
PeersStorage::Memory(MemoryPeerstore::empty())
};
for bootnode in config.boot_nodes.iter() {
parse_and_add_to_peerstore(bootnode, &peerstore)?;
}
let reserved_peers = {
let mut reserved_peers = FnvHashSet::with_capacity_and_hasher(
config.reserved_nodes.len(),
@@ -391,6 +387,12 @@ impl NetworkState {
Ok(peer_id)
}
/// Adds a peer to the internal peer store.
/// Returns an error if the peer address is invalid.
pub fn add_peer(&self, peer: &str) -> Result<PeerstorePeerId, Error> {
parse_and_add_to_peerstore(peer, &self.peerstore)
}
/// Adds a reserved peer to the list of reserved peers.
/// Returns an error if the peer address is invalid.
pub fn add_reserved_peer(&self, peer: &str) -> Result<(), Error> {
@@ -753,13 +755,12 @@ fn num_open_custom_connections(connections: &Connections) -> u32 {
/// to the given peerstore. Returns the corresponding peer ID.
fn parse_and_add_to_peerstore(addr_str: &str, peerstore: &PeersStorage)
-> Result<PeerstorePeerId, Error> {
let mut addr: Multiaddr = addr_str.parse()
.map_err(|_| ErrorKind::AddressParse)?;
let p2p_component = addr.pop().ok_or(ErrorKind::AddressParse)?;
let peer_id = match p2p_component {
AddrComponent::P2P(key) | AddrComponent::IPFS(key) =>
let mut addr = addr_str.to_multiaddr().map_err(|_| ErrorKind::AddressParse)?;
let peer_id = match addr.pop() {
Some(AddrComponent::P2P(key)) | Some(AddrComponent::IPFS(key)) =>
PeerstorePeerId::from_bytes(key).map_err(|_| ErrorKind::AddressParse)?,
_ => return Err(ErrorKind::BadProtocol.into()),
_ => return Err(ErrorKind::AddressParse.into()),
};
// Registering the bootstrap node with a TTL of 100000 years TODO: wrong
@@ -473,29 +473,51 @@ fn init_thread(
},
}
}
// Explicitely connect to the boostrap nodes as a temporary measure.
trace!(target: "sub-libp2p", "Dialing bootnodes");
// Explicitely connect to _all_ the boostrap nodes as a temporary measure.
for bootnode in shared.config.boot_nodes.iter() {
// TODO: this code is copy-pasted from `network_state`, but it is
// temporary anyway
let mut addr: Multiaddr = bootnode.parse()
.map_err(|_| ErrorKind::AddressParse)?;
let p2p_component = addr.pop().ok_or(ErrorKind::AddressParse)?;
let peer_id = match p2p_component {
AddrComponent::P2P(key) | AddrComponent::IPFS(key) =>
PeerstorePeerId::from_bytes(key).map_err(|_| ErrorKind::AddressParse)?,
_ => return Err(ErrorKind::BadProtocol.into()),
};
match shared.network_state.add_peer(bootnode) {
Ok(peer_id) => {
trace!(target: "sub-libp2p", "Dialing bootnode {:?}", peer_id);
for proto in shared.protocols.read().0.clone().into_iter() {
open_peer_custom_proto(
shared.clone(),
transport.clone(),
proto,
peer_id.clone(),
&swarm_controller
)
}
},
Err(Error(ErrorKind::AddressParse, _)) => {
// fallback: trying with IP:Port
let multi = match bootnode.parse::<SocketAddr>() {
Ok(SocketAddr::V4(socket)) =>
format!("/ip4/{}/tcp/{}", socket.ip(), socket.port()).parse::<Multiaddr>(),
Ok(SocketAddr::V6(socket)) =>
format!("/ip6/{}/tcp/{}", socket.ip(), socket.port()).parse::<Multiaddr>(),
_ => {
warn!(target: "sub-libp2p", "Not a valid Bootnode Address {:}", bootnode);
continue;
}
};
for proto in shared.protocols.read().0.clone().into_iter() {
open_peer_custom_proto(
shared.clone(),
transport.clone(),
proto,
peer_id.clone(),
&swarm_controller
)
if let Ok(addr) = multi {
trace!(target: "sub-libp2p", "Missing PeerId for Bootnode {:}. Querying", bootnode);
for proto in shared.protocols.read().0.clone().into_iter() {
connect_with_query_peer_id(
shared.clone(),
transport.clone(),
proto,
addr.clone(),
&swarm_controller
)
}
} else {
warn!(target: "sub-libp2p", "Not a valid Bootnode Address {:}", bootnode);
continue;
}
},
Err(err) => warn!(target:"sub-libp2p", "Couldn't parse Bootnode Address: {}", err),
}
}
@@ -978,6 +1000,58 @@ fn connect_to_nodes<T, To, St, C>(
}
}
fn connect_with_query_peer_id<T, To, St, C>(
shared: Arc<Shared>,
base_transport: T,
proto: RegisteredProtocol<Arc<NetworkProtocolHandler + Send + Sync>>,
addr: Multiaddr,
swarm_controller: &SwarmController<St>
)
where T: MuxedTransport<Output = TransportOutput<To>> + Clone + 'static,
T::MultiaddrFuture: 'static,
To: AsyncRead + AsyncWrite + 'static,
St: MuxedTransport<Output = FinalUpgrade<C>> + Clone + 'static,
C: 'static,
{
let addr2 = addr.clone();
let with_proto = base_transport
.clone()
.and_then(move |out, endpoint, client_addr| {
trace!(target: "sub-libp2p", "in");
let socket = out.socket;
let original_addr = out.original_addr;
out.info
.and_then(move |info| {
let _ = process_identify_info(shared, &info, original_addr,
endpoint, &base_transport);
trace!(target: "sub-libp2p", "Bootnode {:} found with peer id: {:?}",
addr2, info.info.public_key.into_peer_id());
upgrade::apply(socket, proto, endpoint, client_addr)
})
})
.and_then(move |out, _endpoint, client_addr|
client_addr.map(move |client_addr|
(FinalUpgrade::Custom(out, client_addr.clone()), future::ok(client_addr))
)
);
let with_timeout = TransportTimeout::new(with_proto, Duration::from_secs(10));
let with_err = with_timeout
.map_err({
let addr = addr.clone();
move |err| {
warn!(target: "sub-libp2p", "Error while dialing {:?} to query peer id: {:?}",
addr, err);
err
}
});
let _ = swarm_controller.dial(addr.clone(), with_err)
.map_err( move |err| warn!(target: "sub-libp2p",
"Error when querying peer node info {:} of {:}", err, addr));
}
/// If necessary, dials the given address for the given protocol and using the
/// given `swarm_controller`. Has no effect if we already dialed earlier.
/// Checks that the peer ID matches `expected_peer_id`.