Style fixes (#396)

* Fix p2p

* Cosmetic stuff

* More cosmetics

* Whitespace

* Whitespace

* Whitespace

* Renames

* Most cosmetics

* typo

* minor rename

* Remote-end disconnect message should be info!

* invalid tab

* Avoid ignoring sustained bad pings

* Remove workarounds.
This commit is contained in:
Gav Wood
2018-07-23 16:46:13 +02:00
committed by GitHub
parent fcb30f4009
commit 077c3d3cb7
17 changed files with 628 additions and 612 deletions
+53 -53
View File
@@ -49,7 +49,7 @@ use parking_lot::Mutex;
use polkadot_consensus::{Statement, SignedStatement, GenericStatement};
use polkadot_primitives::{AccountId, Block, SessionKey, Hash, Header};
use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic, CandidateReceipt, Collation};
use substrate_network::{PeerId, RequestId, Context, Severity};
use substrate_network::{NodeIndex, RequestId, Context, Severity};
use substrate_network::consensus_gossip::ConsensusGossip;
use substrate_network::{message, generic_message};
use substrate_network::specialization::Specialization;
@@ -244,7 +244,7 @@ impl Decode for Message {
}
}
fn send_polkadot_message(ctx: &mut Context<Block>, to: PeerId, message: Message) {
fn send_polkadot_message(ctx: &mut Context<Block>, to: NodeIndex, message: Message) {
trace!(target: "p_net", "Sending polkadot message to {}: {:?}", to, message);
let encoded = message.encode();
ctx.send_message(to, generic_message::Message::ChainSpecific(encoded))
@@ -252,14 +252,14 @@ fn send_polkadot_message(ctx: &mut Context<Block>, to: PeerId, message: Message)
/// Polkadot protocol attachment for substrate.
pub struct PolkadotProtocol {
peers: HashMap<PeerId, PeerInfo>,
peers: HashMap<NodeIndex, PeerInfo>,
collating_for: Option<(AccountId, ParaId)>,
consensus_gossip: ConsensusGossip<Block>,
collators: CollatorPool,
validators: HashMap<SessionKey, PeerId>,
validators: HashMap<SessionKey, NodeIndex>,
local_collations: LocalCollations<Collation>,
live_consensus: Option<CurrentConsensus>,
in_flight: HashMap<(RequestId, PeerId), BlockDataRequest>,
in_flight: HashMap<(RequestId, NodeIndex), BlockDataRequest>,
pending: Vec<BlockDataRequest>,
next_req_id: u64,
}
@@ -352,17 +352,17 @@ impl PolkadotProtocol {
.map(|(_, id)| id);
// dispatch to peer
if let Some(peer_id) = next_peer {
if let Some(who) = next_peer {
let req_id = self.next_req_id;
self.next_req_id += 1;
send_polkadot_message(
ctx,
peer_id,
who,
Message::RequestBlockData(req_id, pending.candidate_hash)
);
self.in_flight.insert((req_id, peer_id), pending);
self.in_flight.insert((req_id, who), pending);
continue;
}
@@ -374,36 +374,36 @@ impl PolkadotProtocol {
self.pending = new_pending;
}
fn on_polkadot_message(&mut self, ctx: &mut Context<Block>, peer_id: PeerId, raw: Vec<u8>, msg: Message) {
trace!(target: "p_net", "Polkadot message from {}: {:?}", peer_id, msg);
fn on_polkadot_message(&mut self, ctx: &mut Context<Block>, who: NodeIndex, raw: Vec<u8>, msg: Message) {
trace!(target: "p_net", "Polkadot message from {}: {:?}", who, msg);
match msg {
Message::Statement(parent_hash, _statement) =>
self.consensus_gossip.on_chain_specific(ctx, peer_id, raw, parent_hash),
Message::SessionKey(key) => self.on_session_key(ctx, peer_id, key),
self.consensus_gossip.on_chain_specific(ctx, who, raw, parent_hash),
Message::SessionKey(key) => self.on_session_key(ctx, who, key),
Message::RequestBlockData(req_id, hash) => {
let block_data = self.live_consensus.as_ref()
.and_then(|c| c.block_data(&hash));
send_polkadot_message(ctx, peer_id, Message::BlockData(req_id, block_data));
send_polkadot_message(ctx, who, Message::BlockData(req_id, block_data));
}
Message::BlockData(req_id, data) => self.on_block_data(ctx, peer_id, req_id, data),
Message::Collation(relay_parent, collation) => self.on_collation(ctx, peer_id, relay_parent, collation),
Message::CollatorRole(role) => self.on_new_role(ctx, peer_id, role),
Message::BlockData(req_id, data) => self.on_block_data(ctx, who, req_id, data),
Message::Collation(relay_parent, collation) => self.on_collation(ctx, who, relay_parent, collation),
Message::CollatorRole(role) => self.on_new_role(ctx, who, role),
}
}
fn on_session_key(&mut self, ctx: &mut Context<Block>, peer_id: PeerId, key: SessionKey) {
fn on_session_key(&mut self, ctx: &mut Context<Block>, who: NodeIndex, key: SessionKey) {
{
let info = match self.peers.get_mut(&peer_id) {
let info = match self.peers.get_mut(&who) {
Some(peer) => peer,
None => {
trace!(target: "p_net", "Network inconsistency: message received from unconnected peer {}", peer_id);
trace!(target: "p_net", "Network inconsistency: message received from unconnected peer {}", who);
return
}
};
if !info.claimed_validator {
ctx.report_peer(peer_id, Severity::Bad("Session key broadcasted without setting authority role"));
ctx.report_peer(who, Severity::Bad("Session key broadcasted without setting authority role"));
return;
}
@@ -413,20 +413,20 @@ impl PolkadotProtocol {
for (relay_parent, collation) in self.local_collations.fresh_key(&old_key, &key) {
send_polkadot_message(
ctx,
peer_id,
who,
Message::Collation(relay_parent, collation),
)
}
}
self.validators.insert(key, peer_id);
self.validators.insert(key, who);
}
self.dispatch_pending_requests(ctx);
}
fn on_block_data(&mut self, ctx: &mut Context<Block>, peer_id: PeerId, req_id: RequestId, data: Option<BlockData>) {
match self.in_flight.remove(&(req_id, peer_id)) {
fn on_block_data(&mut self, ctx: &mut Context<Block>, who: NodeIndex, req_id: RequestId, data: Option<BlockData>) {
match self.in_flight.remove(&(req_id, who)) {
Some(req) => {
if let Some(data) = data {
if data.hash() == req.block_data_hash {
@@ -438,29 +438,29 @@ impl PolkadotProtocol {
self.pending.push(req);
self.dispatch_pending_requests(ctx);
}
None => ctx.report_peer(peer_id, Severity::Bad("Unexpected block data response")),
None => ctx.report_peer(who, Severity::Bad("Unexpected block data response")),
}
}
// when a validator sends us (a collator) a new role.
fn on_new_role(&mut self, ctx: &mut Context<Block>, peer_id: PeerId, role: Role) {
let info = match self.peers.get(&peer_id) {
fn on_new_role(&mut self, ctx: &mut Context<Block>, who: NodeIndex, role: Role) {
let info = match self.peers.get(&who) {
Some(peer) => peer,
None => {
trace!(target: "p_net", "Network inconsistency: message received from unconnected peer {}", peer_id);
trace!(target: "p_net", "Network inconsistency: message received from unconnected peer {}", who);
return
}
};
match info.validator_key {
None => ctx.report_peer(
peer_id,
who,
Severity::Bad("Sent collator role without registering first as validator"),
),
Some(key) => for (relay_parent, collation) in self.local_collations.note_validator_role(key, role) {
send_polkadot_message(
ctx,
peer_id,
who,
Message::Collation(relay_parent, collation),
)
},
@@ -473,7 +473,7 @@ impl Specialization<Block> for PolkadotProtocol {
Status { collating_for: self.collating_for.clone() }.encode()
}
fn on_connect(&mut self, ctx: &mut Context<Block>, peer_id: PeerId, status: FullStatus) {
fn on_connect(&mut self, ctx: &mut Context<Block>, who: NodeIndex, status: FullStatus) {
let local_status = match Status::decode(&mut &status.chain_status[..]) {
Some(status) => status,
None => {
@@ -483,14 +483,14 @@ impl Specialization<Block> for PolkadotProtocol {
if let Some((ref acc_id, ref para_id)) = local_status.collating_for {
if self.collator_peer_id(acc_id.clone()).is_some() {
ctx.report_peer(peer_id, Severity::Useless("Unknown Polkadot-specific reason"));
ctx.report_peer(who, Severity::Useless("Unknown Polkadot-specific reason"));
return
}
let collator_role = self.collators.on_new_collator(acc_id.clone(), para_id.clone());
send_polkadot_message(
ctx,
peer_id,
who,
Message::CollatorRole(collator_role),
);
}
@@ -498,17 +498,17 @@ impl Specialization<Block> for PolkadotProtocol {
let validator = status.roles.contains(substrate_network::Roles::AUTHORITY);
let send_key = validator || local_status.collating_for.is_some();
self.peers.insert(peer_id, PeerInfo {
self.peers.insert(who, PeerInfo {
collating_for: local_status.collating_for,
validator_key: None,
claimed_validator: validator,
});
self.consensus_gossip.new_peer(ctx, peer_id, status.roles);
self.consensus_gossip.new_peer(ctx, who, status.roles);
if let (true, &Some(ref consensus)) = (send_key, &self.live_consensus) {
send_polkadot_message(
ctx,
peer_id,
who,
Message::SessionKey(consensus.local_session_key)
);
}
@@ -516,8 +516,8 @@ impl Specialization<Block> for PolkadotProtocol {
self.dispatch_pending_requests(ctx);
}
fn on_disconnect(&mut self, ctx: &mut Context<Block>, peer_id: PeerId) {
if let Some(info) = self.peers.remove(&peer_id) {
fn on_disconnect(&mut self, ctx: &mut Context<Block>, who: NodeIndex) {
if let Some(info) = self.peers.remove(&who) {
if let Some((acc_id, _)) = info.collating_for {
let new_primary = self.collators.on_disconnect(acc_id)
.and_then(|new_primary| self.collator_peer_id(new_primary));
@@ -539,7 +539,7 @@ impl Specialization<Block> for PolkadotProtocol {
{
let pending = &mut self.pending;
self.in_flight.retain(|&(_, ref peer), val| {
let retain = peer != &peer_id;
let retain = peer != &who;
if !retain {
let (sender, _) = oneshot::channel();
pending.push(::std::mem::replace(val, BlockDataRequest {
@@ -554,24 +554,24 @@ impl Specialization<Block> for PolkadotProtocol {
retain
});
}
self.consensus_gossip.peer_disconnected(ctx, peer_id);
self.consensus_gossip.peer_disconnected(ctx, who);
self.dispatch_pending_requests(ctx);
}
}
fn on_message(&mut self, ctx: &mut Context<Block>, peer_id: PeerId, message: message::Message<Block>) {
fn on_message(&mut self, ctx: &mut Context<Block>, who: NodeIndex, message: message::Message<Block>) {
match message {
generic_message::Message::BftMessage(msg) => {
trace!(target: "p_net", "Polkadot BFT message from {}: {:?}", peer_id, msg);
trace!(target: "p_net", "Polkadot BFT message from {}: {:?}", who, msg);
// TODO: check signature here? what if relevant block is unknown?
self.consensus_gossip.on_bft_message(ctx, peer_id, msg)
self.consensus_gossip.on_bft_message(ctx, who, msg)
}
generic_message::Message::ChainSpecific(raw) => {
match Message::decode(&mut raw.as_slice()) {
Some(msg) => self.on_polkadot_message(ctx, peer_id, raw, msg),
Some(msg) => self.on_polkadot_message(ctx, who, raw, msg),
None => {
trace!(target: "p_net", "Bad message from {}", peer_id);
ctx.report_peer(peer_id, Severity::Bad("Invalid polkadot protocol message format"));
trace!(target: "p_net", "Bad message from {}", who);
ctx.report_peer(who, Severity::Bad("Invalid polkadot protocol message format"));
}
}
}
@@ -611,7 +611,7 @@ impl Specialization<Block> for PolkadotProtocol {
impl PolkadotProtocol {
// we received a collation from a peer
fn on_collation(&mut self, ctx: &mut Context<Block>, from: PeerId, relay_parent: Hash, collation: Collation) {
fn on_collation(&mut self, ctx: &mut Context<Block>, from: NodeIndex, relay_parent: Hash, collation: Collation) {
let collation_para = collation.receipt.parachain_index;
let collated_acc = collation.receipt.collator;
@@ -638,7 +638,7 @@ impl PolkadotProtocol {
}
// get connected peer with given account ID for collation.
fn collator_peer_id(&self, account_id: AccountId) -> Option<PeerId> {
fn collator_peer_id(&self, account_id: AccountId) -> Option<NodeIndex> {
let check_info = |info: &PeerInfo| info
.collating_for
.as_ref()
@@ -647,14 +647,14 @@ impl PolkadotProtocol {
self.peers
.iter()
.filter(|&(_, info)| check_info(info))
.map(|(peer_id, _)| *peer_id)
.map(|(who, _)| *who)
.next()
}
// disconnect a collator by account-id.
fn disconnect_bad_collator(&self, ctx: &mut Context<Block>, account_id: AccountId) {
if let Some(peer_id) = self.collator_peer_id(account_id) {
ctx.report_peer(peer_id, Severity::Bad("Consensus layer determined the given collator misbehaved"))
if let Some(who) = self.collator_peer_id(account_id) {
ctx.report_peer(who, Severity::Bad("Consensus layer determined the given collator misbehaved"))
}
}
}
@@ -670,9 +670,9 @@ impl PolkadotProtocol {
) {
for (primary, cloned_collation) in self.local_collations.add_collation(relay_parent, targets, collation.clone()) {
match self.validators.get(&primary) {
Some(peer_id) => send_polkadot_message(
Some(who) => send_polkadot_message(
ctx,
*peer_id,
*who,
Message::Collation(relay_parent, cloned_collation),
),
None =>
+13 -13
View File
@@ -24,16 +24,16 @@ use polkadot_primitives::{Block, Hash, SessionKey};
use polkadot_primitives::parachain::{CandidateReceipt, HeadData, BlockData};
use substrate_primitives::H512;
use codec::Encode;
use substrate_network::{Severity, PeerId, PeerInfo, ClientHandle, Context, Roles, message::Message as SubstrateMessage, specialization::Specialization, generic_message::Message as GenericMessage};
use substrate_network::{Severity, NodeIndex, PeerInfo, ClientHandle, Context, Roles, message::Message as SubstrateMessage, specialization::Specialization, generic_message::Message as GenericMessage};
use std::sync::Arc;
use futures::Future;
#[derive(Default)]
struct TestContext {
disabled: Vec<PeerId>,
disconnected: Vec<PeerId>,
messages: Vec<(PeerId, SubstrateMessage<Block>)>,
disabled: Vec<NodeIndex>,
disconnected: Vec<NodeIndex>,
messages: Vec<(NodeIndex, SubstrateMessage<Block>)>,
}
impl Context<Block> for TestContext {
@@ -41,24 +41,24 @@ impl Context<Block> for TestContext {
unimplemented!()
}
fn report_peer(&mut self, peer: PeerId, reason: Severity) {
fn report_peer(&mut self, peer: NodeIndex, reason: Severity) {
match reason {
Severity::Bad(_) => self.disabled.push(peer),
_ => self.disconnected.push(peer),
}
}
fn peer_info(&self, _peer: PeerId) -> Option<PeerInfo<Block>> {
fn peer_info(&self, _peer: NodeIndex) -> Option<PeerInfo<Block>> {
unimplemented!()
}
fn send_message(&mut self, peer_id: PeerId, data: SubstrateMessage<Block>) {
self.messages.push((peer_id, data))
fn send_message(&mut self, who: NodeIndex, data: SubstrateMessage<Block>) {
self.messages.push((who, data))
}
}
impl TestContext {
fn has_message(&self, to: PeerId, message: Message) -> bool {
fn has_message(&self, to: NodeIndex, message: Message) -> bool {
use substrate_network::generic_message::Message as GenericMessage;
let encoded = message.encode();
@@ -91,7 +91,7 @@ fn make_consensus(parent_hash: Hash, local_key: SessionKey) -> (CurrentConsensus
(c, knowledge)
}
fn on_message(protocol: &mut PolkadotProtocol, ctx: &mut TestContext, from: PeerId, message: Message) {
fn on_message(protocol: &mut PolkadotProtocol, ctx: &mut TestContext, from: NodeIndex, message: Message) {
let encoded = message.encode();
protocol.on_message(ctx, from, GenericMessage::ChainSpecific(encoded));
}
@@ -209,19 +209,19 @@ fn fetches_from_those_with_knowledge() {
fn remove_bad_collator() {
let mut protocol = PolkadotProtocol::new(None);
let peer_id = 1;
let who = 1;
let account_id = [2; 32].into();
let status = Status { collating_for: Some((account_id, 5.into())) };
{
let mut ctx = TestContext::default();
protocol.on_connect(&mut ctx, peer_id, make_status(&status, Roles::NONE));
protocol.on_connect(&mut ctx, who, make_status(&status, Roles::NONE));
}
{
let mut ctx = TestContext::default();
protocol.disconnect_bad_collator(&mut ctx, account_id);
assert!(ctx.disabled.contains(&peer_id));
assert!(ctx.disabled.contains(&who));
}
}
@@ -18,7 +18,7 @@ use bytes::Bytes;
use fnv::{FnvHashMap, FnvHashSet};
use futures::sync::mpsc;
use libp2p::core::{multiaddr::ToMultiaddr, Multiaddr, AddrComponent, Endpoint, UniqueConnec};
use libp2p::core::{UniqueConnecState, PeerId as PeerstorePeerId, PublicKey};
use libp2p::core::{UniqueConnecState, PeerId, PublicKey};
use libp2p::kad::KadConnecController;
use libp2p::peerstore::{Peerstore, PeerAccess};
use libp2p::peerstore::json_peerstore::JsonPeerstore;
@@ -26,7 +26,7 @@ use libp2p::peerstore::memory_peerstore::MemoryPeerstore;
use libp2p::ping::Pinger;
use libp2p::secio;
use {Error, ErrorKind, NetworkConfiguration, NonReservedPeerMode};
use {PeerId, ProtocolId, SessionInfo};
use {NodeIndex, ProtocolId, SessionInfo};
use parking_lot::{Mutex, RwLock};
use rand::{self, Rng};
use std::cmp;
@@ -46,7 +46,7 @@ const PEER_DISABLE_DURATION: Duration = Duration::from_secs(5 * 60);
// Common struct shared throughout all the components of the service.
pub struct NetworkState {
/// Contains the information about the network.
peerstore: PeersStorage,
node_store: NodeStore,
/// Active connections.
connections: RwLock<Connections>,
@@ -59,14 +59,14 @@ pub struct NetworkState {
/// If true, only reserved peers can connect.
reserved_only: atomic::AtomicBool,
/// List of the IDs of the reserved peers.
reserved_peers: RwLock<FnvHashSet<PeerstorePeerId>>,
reserved_peers: RwLock<FnvHashSet<PeerId>>,
/// Each peer gets assigned a new unique ID. This ID increases linearly.
next_peer_id: atomic::AtomicUsize,
/// Each node we discover gets assigned a new unique ID. This ID increases linearly.
next_node_index: atomic::AtomicUsize,
/// List of the IDs of the disabled peers. These peers will see their
/// connections refused. Includes the time when the disabling expires.
disabled_peers: Mutex<FnvHashMap<PeerstorePeerId, Instant>>,
disabled_nodes: Mutex<FnvHashMap<PeerId, Instant>>,
/// Local private key.
local_private_key: secio::SecioKeyPair,
@@ -74,7 +74,7 @@ pub struct NetworkState {
local_public_key: PublicKey,
}
enum PeersStorage {
enum NodeStore {
/// Peers are stored in memory. Nothing is stored on disk.
Memory(MemoryPeerstore),
/// Peers are stored in a JSON file on the disk.
@@ -84,10 +84,10 @@ enum PeersStorage {
struct Connections {
/// For each libp2p peer ID, the ID of the peer in the API we expose.
/// Also corresponds to the index in `info_by_peer`.
peer_by_nodeid: FnvHashMap<PeerstorePeerId, usize>,
peer_by_nodeid: FnvHashMap<PeerId, usize>,
/// For each peer ID, information about our connection to this peer.
info_by_peer: FnvHashMap<PeerId, PeerConnectionInfo>,
info_by_peer: FnvHashMap<NodeIndex, PeerConnectionInfo>,
}
struct PeerConnectionInfo {
@@ -104,7 +104,7 @@ struct PeerConnectionInfo {
ping_connec: UniqueConnec<Pinger>,
/// Id of the peer.
id: PeerstorePeerId,
id: PeerId,
/// True if this connection was initiated by us.
/// Note that it is theoretically possible that we dial the remote at the
@@ -130,7 +130,7 @@ struct PeerConnectionInfo {
#[derive(Debug, Clone)]
pub struct PeerInfo {
/// Id of the peer.
pub id: PeerstorePeerId,
pub id: PeerId,
/// True if this connection was initiated by us.
/// Note that it is theoretically possible that we dial the remote at the
@@ -172,21 +172,21 @@ impl NetworkState {
let local_public_key = local_private_key.to_public_key();
// Build the storage for peers, including the bootstrap nodes.
let peerstore = if let Some(ref path) = config.net_config_path {
let node_store = if let Some(ref path) = config.net_config_path {
let path = Path::new(path).join(NODES_FILE);
if let Ok(peerstore) = JsonPeerstore::new(path.clone()) {
if let Ok(node_store) = JsonPeerstore::new(path.clone()) {
debug!(target: "sub-libp2p", "Initialized peer store for JSON \
file {:?}", path);
PeersStorage::Json(peerstore)
NodeStore::Json(node_store)
} else {
warn!(target: "sub-libp2p", "Failed to open peer storage {:?} \
; peers won't be saved", path);
PeersStorage::Memory(MemoryPeerstore::empty())
NodeStore::Memory(MemoryPeerstore::empty())
}
} else {
debug!(target: "sub-libp2p", "No peers file configured ; peers \
won't be saved");
PeersStorage::Memory(MemoryPeerstore::empty())
NodeStore::Memory(MemoryPeerstore::empty())
};
let reserved_peers = {
@@ -195,7 +195,7 @@ impl NetworkState {
Default::default()
);
for peer in config.reserved_nodes.iter() {
let id = parse_and_add_to_peerstore(peer, &peerstore)?;
let id = parse_and_add_to_node_store(peer, &node_store)?;
reserved_peers.insert(id);
}
RwLock::new(reserved_peers)
@@ -205,7 +205,7 @@ impl NetworkState {
config.reserved_nodes.len());
Ok(NetworkState {
peerstore,
node_store,
min_peers: config.min_peers,
max_peers: config.max_peers,
connections: RwLock::new(Connections {
@@ -214,8 +214,8 @@ impl NetworkState {
}),
reserved_only: atomic::AtomicBool::new(false),
reserved_peers,
next_peer_id: atomic::AtomicUsize::new(0),
disabled_peers: Mutex::new(Default::default()),
next_node_index: atomic::AtomicUsize::new(0),
disabled_nodes: Mutex::new(Default::default()),
local_private_key,
local_public_key,
})
@@ -234,13 +234,13 @@ impl NetworkState {
/// Returns the ID of a random peer of the network.
///
/// Returns `None` if we don't know any peer.
pub fn random_peer(&self) -> Option<PeerstorePeerId> {
// TODO: optimize by putting the operation directly in the peerstore
pub fn random_peer(&self) -> Option<PeerId> {
// TODO: optimize by putting the operation directly in the node_store
// https://github.com/libp2p/rust-libp2p/issues/316
let peers = match self.peerstore {
PeersStorage::Memory(ref mem) =>
let peers = match self.node_store {
NodeStore::Memory(ref mem) =>
mem.peers().collect::<Vec<_>>(),
PeersStorage::Json(ref json) =>
NodeStore::Json(ref json) =>
json.peers().collect::<Vec<_>>(),
};
@@ -255,11 +255,11 @@ impl NetworkState {
/// Returns all the IDs of the peers on the network we have knowledge of.
///
/// This includes peers we are not connected to.
pub fn known_peers(&self) -> impl Iterator<Item = PeerstorePeerId> {
match self.peerstore {
PeersStorage::Memory(ref mem) =>
pub fn known_peers(&self) -> impl Iterator<Item = PeerId> {
match self.node_store {
NodeStore::Memory(ref mem) =>
mem.peers().collect::<Vec<_>>().into_iter(),
PeersStorage::Json(ref json) =>
NodeStore::Json(ref json) =>
json.peers().collect::<Vec<_>>().into_iter(),
}
}
@@ -270,34 +270,32 @@ impl NetworkState {
}
/// Get a list of all connected peers by id.
pub fn connected_peers(&self) -> Vec<PeerId> {
pub fn connected_peers(&self) -> Vec<NodeIndex> {
self.connections.read().peer_by_nodeid.values().cloned().collect()
}
/// Returns true if the given `PeerId` is valid.
/// Returns true if the given `NodeIndex` is valid.
///
/// `PeerId`s are never reused, so once this function returns `false` it
/// will never return `true` again for the same `PeerId`.
pub fn is_peer_connected(&self, peer: PeerId) -> bool {
/// `NodeIndex`s are never reused, so once this function returns `false` it
/// will never return `true` again for the same `NodeIndex`.
pub fn is_peer_connected(&self, peer: NodeIndex) -> bool {
self.connections.read().info_by_peer.contains_key(&peer)
}
/// Reports the ping of the peer. Returned later by `session_info()`.
/// No-op if the `peer_id` is not valid/expired.
pub fn report_ping_duration(&self, peer_id: PeerId, ping: Duration) {
let connections = self.connections.read();
let info = match connections.info_by_peer.get(&peer_id) {
/// No-op if the `who` is not valid/expired.
pub fn report_ping_duration(&self, who: NodeIndex, ping: Duration) {
let mut connections = self.connections.write();
let info = match connections.info_by_peer.get_mut(&who) {
Some(info) => info,
None => return,
};
*info.ping.lock() = Some(ping);
}
/// If we're connected to a peer with the given protocol, returns
/// information about the connection. Otherwise, returns `None`.
pub fn session_info(&self, peer: PeerId, protocol: ProtocolId)
-> Option<SessionInfo> {
pub fn session_info(&self, peer: NodeIndex, protocol: ProtocolId) -> Option<SessionInfo> {
let connections = self.connections.read();
let info = match connections.info_by_peer.get(&peer) {
Some(info) => info,
@@ -333,8 +331,7 @@ impl NetworkState {
/// If we're connected to a peer with the given protocol, returns the
/// protocol version. Otherwise, returns `None`.
pub fn protocol_version(&self, peer: PeerId, protocol: ProtocolId)
-> Option<u8> {
pub fn protocol_version(&self, peer: NodeIndex, protocol: ProtocolId) -> Option<u8> {
let connections = self.connections.read();
let peer = match connections.info_by_peer.get(&peer) {
Some(peer) => peer,
@@ -348,8 +345,7 @@ impl NetworkState {
}
/// Equivalent to `session_info(peer).map(|info| info.client_version)`.
pub fn peer_client_version(&self, peer: PeerId, protocol: ProtocolId)
-> Option<String> {
pub fn peer_client_version(&self, peer: NodeIndex, protocol: ProtocolId) -> Option<String> {
// TODO: implement more directly, without going through `session_info`
self.session_info(peer, protocol)
.map(|info| info.client_version)
@@ -357,14 +353,14 @@ impl NetworkState {
/// Adds an address discovered by Kademlia.
/// Note that we don't have to be connected to a peer to add an address.
pub fn add_kad_discovered_addr(&self, node_id: &PeerstorePeerId, addr: Multiaddr) {
pub fn add_kad_discovered_addr(&self, node_id: &PeerId, addr: Multiaddr) {
trace!(target: "sub-libp2p", "Peer store: adding address {} for {:?}",
addr, node_id);
match self.peerstore {
PeersStorage::Memory(ref mem) =>
match self.node_store {
NodeStore::Memory(ref mem) =>
mem.peer_or_create(node_id)
.add_addr(addr, Duration::from_secs(3600)),
PeersStorage::Json(ref json) =>
NodeStore::Json(ref json) =>
json.peer_or_create(node_id)
.add_addr(addr, Duration::from_secs(3600)),
}
@@ -373,15 +369,14 @@ impl NetworkState {
/// Signals that an address doesn't match the corresponding node ID.
/// This removes the address from the peer store, so that it is not
/// returned by `addrs_of_peer` again in the future.
pub fn set_invalid_kad_address(&self, node_id: &PeerstorePeerId,
addr: &Multiaddr) {
pub fn set_invalid_kad_address(&self, node_id: &PeerId, addr: &Multiaddr) {
// TODO: blacklist the address?
match self.peerstore {
PeersStorage::Memory(ref mem) =>
match self.node_store {
NodeStore::Memory(ref mem) =>
if let Some(mut peer) = mem.peer(node_id) {
peer.rm_addr(addr.clone()) // TODO: cloning necessary?
},
PeersStorage::Json(ref json) =>
NodeStore::Json(ref json) =>
if let Some(mut peer) = json.peer(node_id) {
peer.rm_addr(addr.clone()) // TODO: cloning necessary?
},
@@ -389,14 +384,14 @@ impl NetworkState {
}
/// Returns the known multiaddresses of a peer.
pub fn addrs_of_peer(&self, node_id: &PeerstorePeerId) -> Vec<Multiaddr> {
match self.peerstore {
PeersStorage::Memory(ref mem) =>
pub fn addrs_of_peer(&self, node_id: &PeerId) -> Vec<Multiaddr> {
match self.node_store {
NodeStore::Memory(ref mem) =>
mem.peer(node_id)
.into_iter()
.flat_map(|p| p.addrs())
.collect::<Vec<_>>(),
PeersStorage::Json(ref json) =>
NodeStore::Json(ref json) =>
json.peer(node_id)
.into_iter()
.flat_map(|p| p.addrs())
@@ -407,35 +402,35 @@ impl NetworkState {
/// Sets information about a peer.
pub fn set_peer_info(
&self,
node_id: PeerstorePeerId,
node_id: PeerId,
endpoint: Endpoint,
client_version: String,
local_addr: Multiaddr,
remote_addr: Multiaddr
) -> Result<PeerId, IoError> {
) -> Result<NodeIndex, IoError> {
let mut connections = self.connections.write();
let peer_id = accept_connection(&mut connections, &self.next_peer_id,
let who = accept_connection(&mut connections, &self.next_node_index,
node_id.clone(), endpoint)?;
let infos = connections.info_by_peer.get_mut(&peer_id)
let infos = connections.info_by_peer.get_mut(&who)
.expect("Newly-created peer id is always valid");
infos.client_version = Some(client_version);
infos.remote_address = Some(remote_addr);
infos.local_address = Some(local_addr);
Ok(peer_id)
Ok(who)
}
/// Adds a peer to the internal peer store.
/// Returns an error if the peer address is invalid.
pub fn add_peer(&self, peer: &str) -> Result<PeerstorePeerId, Error> {
parse_and_add_to_peerstore(peer, &self.peerstore)
pub fn add_peer(&self, peer: &str) -> Result<PeerId, Error> {
parse_and_add_to_node_store(peer, &self.node_store)
}
/// Adds a reserved peer to the list of reserved peers.
/// Returns an error if the peer address is invalid.
pub fn add_reserved_peer(&self, peer: &str) -> Result<(), Error> {
let id = parse_and_add_to_peerstore(peer, &self.peerstore)?;
let id = parse_and_add_to_node_store(peer, &self.node_store)?;
self.reserved_peers.write().insert(id);
Ok(())
}
@@ -444,14 +439,14 @@ impl NetworkState {
/// active connection to this peer.
/// Returns an error if the peer address is invalid.
pub fn remove_reserved_peer(&self, peer: &str) -> Result<(), Error> {
let id = parse_and_add_to_peerstore(peer, &self.peerstore)?;
let id = parse_and_add_to_node_store(peer, &self.node_store)?;
self.reserved_peers.write().remove(&id);
// Dropping the peer if we're in reserved mode.
if self.reserved_only.load(atomic::Ordering::SeqCst) {
let mut connections = self.connections.write();
if let Some(peer_id) = connections.peer_by_nodeid.remove(&id) {
connections.info_by_peer.remove(&peer_id);
if let Some(who) = connections.peer_by_nodeid.remove(&id) {
connections.info_by_peer.remove(&who);
}
}
@@ -486,7 +481,7 @@ impl NetworkState {
}
/// Returns true if we are connected to the given node.
pub fn has_connection(&self, node_id: &PeerstorePeerId) -> bool {
pub fn has_connection(&self, node_id: &PeerId) -> bool {
let connections = self.connections.read();
connections.peer_by_nodeid.contains_key(node_id)
}
@@ -494,56 +489,56 @@ impl NetworkState {
/// Obtains the `UniqueConnec` corresponding to the Kademlia connection to a peer.
pub fn kad_connection(
&self,
node_id: PeerstorePeerId
) -> Result<(PeerId, UniqueConnec<KadConnecController>), IoError> {
node_id: PeerId
) -> Result<(NodeIndex, UniqueConnec<KadConnecController>), IoError> {
// TODO: check that the peer is disabled? should disabling a peer also prevent
// kad from working?
let mut connections = self.connections.write();
let peer_id = accept_connection(&mut connections, &self.next_peer_id,
let who = accept_connection(&mut connections, &self.next_node_index,
node_id, Endpoint::Listener)?;
let infos = connections.info_by_peer.get_mut(&peer_id)
let infos = connections.info_by_peer.get_mut(&who)
.expect("Newly-created peer id is always valid");
let connec = infos.kad_connec.clone();
Ok((peer_id, connec))
Ok((who, connec))
}
/// Obtains the `UniqueConnec` corresponding to the Ping connection to a peer.
pub fn ping_connection(
&self,
node_id: PeerstorePeerId
) -> Result<(PeerId, UniqueConnec<Pinger>), IoError> {
node_id: PeerId
) -> Result<(NodeIndex, UniqueConnec<Pinger>), IoError> {
let mut connections = self.connections.write();
let peer_id = accept_connection(&mut connections, &self.next_peer_id,
let who = accept_connection(&mut connections, &self.next_node_index,
node_id, Endpoint::Listener)?;
let infos = connections.info_by_peer.get_mut(&peer_id)
let infos = connections.info_by_peer.get_mut(&who)
.expect("Newly-created peer id is always valid");
let connec = infos.ping_connec.clone();
Ok((peer_id, connec))
Ok((who, connec))
}
/// Cleans up inactive connections and returns a list of
/// connections to ping.
pub fn cleanup_and_prepare_ping(
&self
) -> Vec<(PeerId, PeerstorePeerId, UniqueConnec<Pinger>)> {
) -> Vec<(NodeIndex, PeerId, UniqueConnec<Pinger>)> {
let mut connections = self.connections.write();
let connections = &mut *connections;
let peer_by_nodeid = &mut connections.peer_by_nodeid;
let info_by_peer = &mut connections.info_by_peer;
let mut ret = Vec::with_capacity(info_by_peer.len());
info_by_peer.retain(|&peer_id, infos| {
info_by_peer.retain(|&who, infos| {
// Remove the peer if neither Kad nor any protocol is alive.
if !infos.kad_connec.is_alive() &&
!infos.protocols.iter().any(|(_, conn)| conn.is_alive())
{
peer_by_nodeid.remove(&infos.id);
trace!(target: "sub-libp2p", "Cleaning up expired peer \
#{:?} ({:?})", peer_id, infos.id);
#{:?} ({:?})", who, infos.id);
return false;
}
ret.push((peer_id, infos.id.clone(), infos.ping_connec.clone()));
ret.push((who, infos.id.clone(), infos.ping_connec.clone()));
true
});
ret
@@ -551,8 +546,8 @@ impl NetworkState {
/// Try to add a new connection to a node in the list.
///
/// Returns a `PeerId` to allow further interfacing with this connection.
/// Note that all `PeerId`s are unique and never reused.
/// Returns a `NodeIndex` to allow further interfacing with this connection.
/// Note that all `NodeIndex`s are unique and never reused.
///
/// Can return an error if we are refusing the connection to the remote.
///
@@ -563,25 +558,23 @@ impl NetworkState {
/// so by dropping this sender.
pub fn custom_proto(
&self,
node_id: PeerstorePeerId,
node_id: PeerId,
protocol_id: ProtocolId,
endpoint: Endpoint,
) -> Result<(PeerId, UniqueConnec<(mpsc::UnboundedSender<Bytes>, u8)>), IoError> {
) -> Result<(NodeIndex, UniqueConnec<(mpsc::UnboundedSender<Bytes>, u8)>), IoError> {
let mut connections = self.connections.write();
if is_peer_disabled(&self.disabled_peers, &node_id) {
debug!(target: "sub-libp2p", "Refusing node {:?} because it was \
disabled", node_id);
return Err(IoError::new(IoErrorKind::PermissionDenied,
"disabled peer"))
if is_peer_disabled(&self.disabled_nodes, &node_id) {
debug!(target: "sub-libp2p", "Refusing node {:?} because it was disabled", node_id);
return Err(IoError::new(IoErrorKind::PermissionDenied, "disabled peer"))
}
let peer_id = accept_connection(&mut connections, &self.next_peer_id,
let who = accept_connection(&mut connections, &self.next_node_index,
node_id.clone(), endpoint)?;
let num_open_connections = num_open_custom_connections(&connections);
let infos = connections.info_by_peer.get_mut(&peer_id)
let infos = connections.info_by_peer.get_mut(&who)
.expect("Newly-created peer id is always valid");
let node_is_reserved = self.reserved_peers.read().contains(&infos.id);
@@ -589,27 +582,24 @@ impl NetworkState {
if self.reserved_only.load(atomic::Ordering::Relaxed) ||
num_open_connections >= self.max_peers
{
debug!(target: "sub-libp2p", "Refusing node {:?} because we \
reached the max number of peers", node_id);
return Err(IoError::new(IoErrorKind::PermissionDenied,
"maximum number of peers reached"))
debug!(target: "sub-libp2p", "Refusing node {:?} because we reached the max number of peers", node_id);
return Err(IoError::new(IoErrorKind::PermissionDenied, "maximum number of peers reached"))
}
}
if let Some((_, ref uconn)) = infos.protocols.iter().find(|&(prot, _)| prot == &protocol_id) {
return Ok((peer_id, uconn.clone()))
return Ok((who, uconn.clone()))
}
let unique_connec = UniqueConnec::empty();
infos.protocols.push((protocol_id.clone(), unique_connec.clone()));
Ok((peer_id, unique_connec))
Ok((who, unique_connec))
}
/// Sends some data to the given peer, using the sender that was passed
/// to the `UniqueConnec` of `custom_proto`.
pub fn send(&self, protocol: ProtocolId, peer_id: PeerId, message: Bytes)
-> Result<(), Error> {
if let Some(peer) = self.connections.read().info_by_peer.get(&peer_id) {
pub fn send(&self, protocol: ProtocolId, who: NodeIndex, message: Bytes) -> Result<(), Error> {
if let Some(peer) = self.connections.read().info_by_peer.get(&who) {
let sender = peer.protocols.iter().find(|elem| elem.0 == protocol)
.and_then(|e| e.1.poll())
.map(|e| e.0);
@@ -622,37 +612,35 @@ impl NetworkState {
// protocol.
debug!(target: "sub-libp2p",
"Tried to send message to peer {} for which we aren't connected with the requested protocol",
peer_id
who
);
return Err(ErrorKind::PeerNotFound.into())
}
} else {
debug!(target: "sub-libp2p", "Tried to send message to invalid peer ID {}", peer_id);
debug!(target: "sub-libp2p", "Tried to send message to invalid peer ID {}", who);
return Err(ErrorKind::PeerNotFound.into())
}
}
/// Get the info on a peer, if there's an active connection.
pub fn peer_info(&self, who: PeerId) -> Option<PeerInfo> {
pub fn peer_info(&self, who: NodeIndex) -> Option<PeerInfo> {
self.connections.read().info_by_peer.get(&who).map(Into::into)
}
/// Reports that an attempt to make a low-level ping of the peer failed.
pub fn report_ping_failed(&self, who: NodeIndex) {
self.drop_peer(who);
}
/// Disconnects a peer, if a connection exists (ie. drops the Kademlia
/// controller, and the senders that were stored in the `UniqueConnec` of
/// `custom_proto`).
pub fn drop_peer(&self, peer_id: PeerId, reason: Option<&str>) {
pub fn drop_peer(&self, who: NodeIndex) {
let mut connections = self.connections.write();
if let Some(peer_info) = connections.info_by_peer.remove(&peer_id) {
if let Some(reason) = reason {
if let (&Some(ref client_version), &Some(ref remote_address)) = (&peer_info.client_version, &peer_info.remote_address) {
debug!(target: "sub-libp2p", "Disconnected peer {} (version: {}, address: {}). {}", peer_id, client_version, remote_address, reason);
} else {
debug!(target: "sub-libp2p", "Disconnected peer {}. {}", peer_id, reason);
}
}
trace!(target: "sub-libp2p", "Destroying peer #{} {:?} ; \
kademlia = {:?} ; num_protos = {:?}", peer_id, peer_info.id,
if let Some(peer_info) = connections.info_by_peer.remove(&who) {
trace!(target: "sub-libp2p", "Destroying peer #{} {:?} ; kademlia = {:?} ; num_protos = {:?}",
who,
peer_info.id,
peer_info.kad_connec.is_alive(),
peer_info.protocols.iter().filter(|c| c.1.is_alive()).count());
// TODO: we manually clear the connections as a work-around for
@@ -660,7 +648,7 @@ impl NetworkState {
for c in peer_info.protocols.iter() { c.1.clear(); }
peer_info.kad_connec.clear();
let old = connections.peer_by_nodeid.remove(&peer_info.id);
debug_assert_eq!(old, Some(peer_id));
debug_assert_eq!(old, Some(who));
}
}
@@ -681,18 +669,18 @@ impl NetworkState {
/// list of disabled peers, and drops any existing connections if
/// necessary (ie. drops the sender that was stored in the `UniqueConnec`
/// of `custom_proto`).
pub fn disable_peer(&self, peer_id: PeerId, reason: &str) {
pub fn ban_peer(&self, who: NodeIndex, reason: &str) {
// TODO: what do we do if the peer is reserved?
// TODO: same logging as in disconnect_peer
let mut connections = self.connections.write();
let peer_info = if let Some(peer_info) = connections.info_by_peer.remove(&peer_id) {
let peer_info = if let Some(peer_info) = connections.info_by_peer.remove(&who) {
if let (&Some(ref client_version), &Some(ref remote_address)) = (&peer_info.client_version, &peer_info.remote_address) {
info!(target: "network", "Peer {} (version: {}, address: {}) disabled. {}", peer_id, client_version, remote_address, reason);
info!(target: "network", "Peer {} (version: {}, address: {}) disabled. {}", who, client_version, remote_address, reason);
} else {
info!(target: "network", "Peer {} disabled. {}", peer_id, reason);
info!(target: "network", "Peer {} disabled. {}", who, reason);
}
let old = connections.peer_by_nodeid.remove(&peer_info.id);
debug_assert_eq!(old, Some(peer_id));
debug_assert_eq!(old, Some(who));
peer_info
} else {
return
@@ -700,7 +688,7 @@ impl NetworkState {
drop(connections);
let timeout = Instant::now() + PEER_DISABLE_DURATION;
self.disabled_peers.lock().insert(peer_info.id.clone(), timeout);
self.disabled_nodes.lock().insert(peer_info.id.clone(), timeout);
}
/// Flushes the caches to the disk.
@@ -708,18 +696,16 @@ impl NetworkState {
/// This is done in an atomical way, so that an error doesn't corrupt
/// anything.
pub fn flush_caches_to_disk(&self) -> Result<(), IoError> {
match self.peerstore {
PeersStorage::Memory(_) => Ok(()),
PeersStorage::Json(ref json) =>
match self.node_store {
NodeStore::Memory(_) => Ok(()),
NodeStore::Json(ref json) =>
match json.flush() {
Ok(()) => {
debug!(target: "sub-libp2p", "Flushed JSON peer store \
to disk");
debug!(target: "sub-libp2p", "Flushed JSON peer store to disk");
Ok(())
}
Err(err) => {
warn!(target: "sub-libp2p", "Failed to flush changes \
to JSON peer store: {}", err);
warn!(target: "sub-libp2p", "Failed to flush changes to JSON peer store: {}", err);
Err(err)
}
}
@@ -733,23 +719,22 @@ impl Drop for NetworkState {
}
}
/// Assigns a `PeerId` to a node, or returns an existing ID if any exists.
/// Assigns a `NodeIndex` to a node, or returns an existing ID if any exists.
///
/// The function only accepts already-locked structs, so that we don't risk
/// any deadlock.
fn accept_connection(
connections: &mut Connections,
next_peer_id: &atomic::AtomicUsize,
node_id: PeerstorePeerId,
next_node_index: &atomic::AtomicUsize,
node_id: PeerId,
endpoint: Endpoint
) -> Result<PeerId, IoError> {
) -> Result<NodeIndex, IoError> {
let peer_by_nodeid = &mut connections.peer_by_nodeid;
let info_by_peer = &mut connections.info_by_peer;
let peer_id = *peer_by_nodeid.entry(node_id.clone()).or_insert_with(|| {
let new_id = next_peer_id.fetch_add(1, atomic::Ordering::Relaxed);
trace!(target: "sub-libp2p", "Creating new peer #{:?} for {:?}",
new_id, node_id);
let who = *peer_by_nodeid.entry(node_id.clone()).or_insert_with(|| {
let new_id = next_node_index.fetch_add(1, atomic::Ordering::Relaxed);
trace!(target: "sub-libp2p", "Creating new peer #{:?} for {:?}", new_id, node_id);
info_by_peer.insert(new_id, PeerConnectionInfo {
protocols: Vec::new(), // TODO: Vec::with_capacity(num_registered_protocols),
@@ -765,13 +750,13 @@ fn accept_connection(
new_id
});
Ok(peer_id)
Ok(who)
}
/// Returns true if a peer is disabled.
fn is_peer_disabled(
list: &Mutex<FnvHashMap<PeerstorePeerId, Instant>>,
peer: &PeerstorePeerId
list: &Mutex<FnvHashMap<PeerId, Instant>>,
peer: &PeerId
) -> bool {
let mut list = list.lock();
if let Some(timeout) = list.get(peer).cloned() {
@@ -804,30 +789,32 @@ fn num_open_custom_connections(connections: &Connections) -> u32 {
}
/// Parses an address of the form `/ip4/x.x.x.x/tcp/x/p2p/xxxxxx`, and adds it
/// to the given peerstore. Returns the corresponding peer ID.
fn parse_and_add_to_peerstore(addr_str: &str, peerstore: &PeersStorage)
-> Result<PeerstorePeerId, Error> {
/// to the given node_store. Returns the corresponding peer ID.
fn parse_and_add_to_node_store(
addr_str: &str,
node_store: &NodeStore
) -> Result<PeerId, Error> {
let mut addr = addr_str.to_multiaddr().map_err(|_| ErrorKind::AddressParse)?;
let peer_id = match addr.pop() {
let who = match addr.pop() {
Some(AddrComponent::P2P(key)) | Some(AddrComponent::IPFS(key)) =>
PeerstorePeerId::from_bytes(key).map_err(|_| ErrorKind::AddressParse)?,
PeerId::from_bytes(key).map_err(|_| ErrorKind::AddressParse)?,
_ => return Err(ErrorKind::AddressParse.into()),
};
// Registering the bootstrap node with a TTL of 100000 years TODO: wrong
match peerstore {
PeersStorage::Memory(ref peerstore) =>
peerstore
.peer_or_create(&peer_id)
match node_store {
NodeStore::Memory(ref node_store) =>
node_store
.peer_or_create(&who)
.add_addr(addr, Duration::from_secs(100000 * 365 * 24 * 3600)),
PeersStorage::Json(ref peerstore) =>
peerstore
.peer_or_create(&peer_id)
NodeStore::Json(ref node_store) =>
node_store
.peer_or_create(&who)
.add_addr(addr, Duration::from_secs(100000 * 365 * 24 * 3600)),
}
Ok(peer_id)
Ok(who)
}
/// Obtains or generates the local private key using the configuration.
@@ -848,9 +835,11 @@ fn obtain_private_key(config: &NetworkConfiguration)
Ok(s) => Ok(s),
Err(err) => {
// Failed to fetch existing file ; generate a new key
trace!(target: "sub-libp2p", "Failed to load existing \
secret key file {:?}, generating new key ; err = {:?}",
secret_path, err);
trace!(target: "sub-libp2p",
"Failed to load existing secret key file {:?}, generating new key ; err = {:?}",
secret_path,
err
);
Ok(gen_key_and_try_write_to_file(&secret_path))
}
}
@@ -861,8 +850,7 @@ fn obtain_private_key(config: &NetworkConfiguration)
let mut key: [u8; 32] = [0; 32];
rand::rngs::EntropyRng::new().fill(&mut key);
Ok(secio::SecioKeyPair::secp256k1_raw_key(&key)
.expect("randomly-generated key with correct len should \
always be valid"))
.expect("randomly-generated key with correct len should always be valid"))
}
}
}
@@ -899,12 +887,17 @@ fn gen_key_and_try_write_to_file<P>(path: P) -> secio::SecioKeyPair
Ok(mut file) =>
match file.write_all(&raw_key) {
Ok(()) => (),
Err(err) => warn!(target: "sub-libp2p", "Failed to write \
secret key in file {:?} ; err = {:?}", path.as_ref(), err),
Err(err) => warn!(target: "sub-libp2p",
"Failed to write secret key in file {:?} ; err = {:?}",
path.as_ref(),
err
),
},
Err(err) =>
warn!(target: "sub-libp2p", "Failed to store secret key in file \
{:?} ; err = {:?}", path.as_ref(), err),
Err(err) => warn!(target: "sub-libp2p",
"Failed to store secret key in file {:?} ; err = {:?}",
path.as_ref(),
err
),
}
secio_key
@@ -943,13 +936,13 @@ mod tests {
let state = NetworkState::new(&Default::default()).unwrap();
let example_peer = PublicKey::Rsa(vec![1, 2, 3, 4]).into_peer_id();
let (peer_id, _) = state.custom_proto(
let (who, _) = state.custom_proto(
example_peer.clone(),
[1, 2, 3],
Endpoint::Dialer
).unwrap();
state.disable_peer(peer_id, "Just a test");
state.ban_peer(who, "Just a test");
assert!(state.custom_proto(
example_peer.clone(),
+133 -111
View File
@@ -16,7 +16,7 @@
use bytes::Bytes;
use {Error, ErrorKind, NetworkConfiguration, NetworkProtocolHandler};
use {NonReservedPeerMode, NetworkContext, Severity, PeerId, ProtocolId};
use {NonReservedPeerMode, NetworkContext, Severity, NodeIndex, ProtocolId};
use parking_lot::{Mutex, RwLock};
use libp2p;
use libp2p::multiaddr::{AddrComponent, Multiaddr};
@@ -77,7 +77,7 @@ struct Shared {
kad_upgrade: KadConnecConfig,
/// List of protocols available on the network. It is a logic error to
/// remote protocols from this list, and the code may assume that protocols
/// remove protocols from this list, and the code may assume that protocols
/// stay at the same index forever.
protocols: RwLock<RegisteredProtocols<Arc<NetworkProtocolHandler + Send + Sync>>>,
@@ -149,8 +149,7 @@ impl NetworkService {
// reach us
self.shared.original_listened_addr.read().as_ref()
.map(|addr|
format!("{}/p2p/{}", addr,
self.shared.kad_system.local_peer_id().to_base58())
format!("{}/p2p/{}", addr, self.shared.kad_system.local_peer_id().to_base58())
)
}
@@ -204,8 +203,7 @@ impl NetworkService {
let fut = match init_thread(core.handle(), shared,
timeouts_register_rx, close_rx) {
Ok(future) => {
debug!(target: "sub-libp2p", "Successfully started \
networking service");
debug!(target: "sub-libp2p", "Successfully started networking service");
let _ = init_tx.send(Ok(()));
future
},
@@ -217,8 +215,7 @@ impl NetworkService {
match core.run(fut) {
Ok(()) => debug!(target: "sub-libp2p", "libp2p future finished"),
Err(err) => error!(target: "sub-libp2p", "error while running \
libp2p: {:?}", err),
Err(err) => error!(target: "sub-libp2p", "error while running libp2p: {:?}", err),
}
});
@@ -234,8 +231,7 @@ impl NetworkService {
if let Some((close_tx, join)) = self.bg_thread.lock().take() {
let _ = close_tx.send(());
if let Err(e) = join.join() {
warn!(target: "sub-libp2p", "error while waiting on libp2p \
background thread: {:?}", e);
warn!(target: "sub-libp2p", "error while waiting on libp2p background thread: {:?}", e);
}
}
@@ -243,7 +239,7 @@ impl NetworkService {
}
/// Get a list of all connected peers by id.
pub fn connected_peers(&self) -> Vec<PeerId> {
pub fn connected_peers(&self) -> Vec<NodeIndex> {
self.shared.network_state.connected_peers()
}
@@ -295,18 +291,18 @@ impl Drop for NetworkService {
struct NetworkContextImpl {
inner: Arc<Shared>,
protocol: ProtocolId,
current_peer: Option<PeerId>,
current_peer: Option<NodeIndex>,
}
impl NetworkContext for NetworkContextImpl {
fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec<u8>) {
fn send(&self, peer: NodeIndex, packet_id: PacketId, data: Vec<u8>) {
self.send_protocol(self.protocol, peer, packet_id, data)
}
fn send_protocol(
&self,
protocol: ProtocolId,
peer: PeerId,
peer: NodeIndex,
packet_id: PacketId,
data: Vec<u8>
) {
@@ -318,7 +314,8 @@ impl NetworkContext for NetworkContextImpl {
message.extend_from_slice(&[packet_id]);
message.extend_from_slice(&data);
if self.inner.network_state.send(protocol, peer, message).is_err() {
self.inner.network_state.drop_peer(peer, Some("Sending to peer failed"));
debug!(target: "sub-libp2p", "Sending to peer {} failed. Dropping.", peer);
self.inner.network_state.drop_peer(peer);
}
}
@@ -330,18 +327,24 @@ impl NetworkContext for NetworkContextImpl {
}
}
fn report_peer(&self, peer: PeerId, reason: Severity) {
fn report_peer(&self, peer: NodeIndex, reason: Severity) {
if let Some(info) = self.inner.network_state.peer_info(peer) {
if let (Some(client_version), Some(remote_address)) = (info.client_version, info.remote_address) {
info!(target: "sub-libp2p", "Peer {} ({} {}) reported by client: {}", peer, remote_address, client_version, reason);
info!(target: "sub-libp2p",
"Peer {} ({} {}) reported by client: {}",
peer,
remote_address,
client_version,
reason
);
} else {
info!(target: "sub-libp2p", "Peer {} reported by client: {}", peer, reason);
}
}
match reason {
Severity::Bad(reason) => self.inner.network_state.disable_peer(peer, reason),
Severity::Useless(reason) => self.inner.network_state.drop_peer(peer, Some(reason)),
Severity::Timeout => self.inner.network_state.drop_peer(peer, Some("Timeout waiting for response")),
Severity::Bad(reason) => self.inner.network_state.ban_peer(peer, reason),
Severity::Useless(_) => self.inner.network_state.drop_peer(peer),
Severity::Timeout => self.inner.network_state.drop_peer(peer),
}
}
@@ -368,17 +371,17 @@ impl NetworkContext for NetworkContextImpl {
Ok(())
}
fn peer_client_version(&self, peer: PeerId) -> String {
fn peer_client_version(&self, peer: NodeIndex) -> String {
// Devp2p returns "unknown" on unknown peer ID, so we do the same.
self.inner.network_state.peer_client_version(peer, self.protocol)
.unwrap_or_else(|| "unknown".to_string())
}
fn session_info(&self, peer: PeerId) -> Option<SessionInfo> {
fn session_info(&self, peer: NodeIndex) -> Option<SessionInfo> {
self.inner.network_state.session_info(peer, self.protocol)
}
fn protocol_version(&self, protocol: ProtocolId, peer: PeerId) -> Option<u8> {
fn protocol_version(&self, protocol: ProtocolId, peer: NodeIndex) -> Option<u8> {
self.inner.network_state.protocol_version(peer, protocol)
}
@@ -394,7 +397,9 @@ impl NetworkContext for NetworkContextImpl {
fn init_thread(
core: Handle,
shared: Arc<Shared>,
timeouts_register_rx: mpsc::UnboundedReceiver<(Duration, (Arc<NetworkProtocolHandler + Send + Sync + 'static>, ProtocolId, TimerToken))>,
timeouts_register_rx: mpsc::UnboundedReceiver<
(Duration, (Arc<NetworkProtocolHandler + Send + Sync + 'static>, ProtocolId, TimerToken))
>,
close_rx: oneshot::Receiver<()>
) -> Result<impl Future<Item = (), Error = IoError>, Error> {
// Build the transport layer.
@@ -407,11 +412,10 @@ fn init_thread(
let addr_resolver = {
let shared = shared.clone();
move |peer_id| {
let addrs = shared.network_state.addrs_of_peer(&peer_id);
move |who| {
let addrs = shared.network_state.addrs_of_peer(&who);
for addr in &addrs {
trace!(target: "sub-libp2p", "{:?} resolved as {}",
peer_id, addr);
trace!(target: "sub-libp2p", "{:?} resolved as {}", who, addr);
}
addrs.into_iter()
}
@@ -473,8 +477,7 @@ fn init_thread(
*shared.original_listened_addr.write() = Some(new_addr.clone());
},
Err(_) => {
warn!(target: "sub-libp2p", "Can't listen on {}, protocol not \
supported", listen_addr);
warn!(target: "sub-libp2p", "Can't listen on {}, protocol not supported", listen_addr);
return Err(ErrorKind::BadProtocol.into())
},
}
@@ -482,14 +485,14 @@ fn init_thread(
// Explicitely connect to _all_ the boostrap nodes as a temporary measure.
for bootnode in shared.config.boot_nodes.iter() {
match shared.network_state.add_peer(bootnode) {
Ok(peer_id) => {
trace!(target: "sub-libp2p", "Dialing bootnode {:?}", peer_id);
Ok(who) => {
trace!(target: "sub-libp2p", "Dialing bootnode {:?}", who);
for proto in shared.protocols.read().0.clone().into_iter() {
open_peer_custom_proto(
shared.clone(),
transport.clone(),
proto,
peer_id.clone(),
who.clone(),
&swarm_controller
)
}
@@ -508,7 +511,7 @@ fn init_thread(
};
if let Ok(addr) = multi {
trace!(target: "sub-libp2p", "Missing PeerId for Bootnode {:}. Querying", bootnode);
trace!(target: "sub-libp2p", "Missing NodeIndex for Bootnode {:}. Querying", bootnode);
for proto in shared.protocols.read().0.clone().into_iter() {
connect_with_query_peer_id(
shared.clone(),
@@ -520,7 +523,7 @@ fn init_thread(
}
} else {
warn!(target: "sub-libp2p", "Not a valid Bootnode Address {:}", bootnode);
continue;
continue;
}
},
Err(err) => warn!(target:"sub-libp2p", "Couldn't parse Bootnode Address: {}", err),
@@ -560,8 +563,7 @@ fn init_thread(
.select(close_rx.then(|_| Ok(()))).map(|_| ()).map_err(|(err, _)| err)
.and_then(move |_| {
debug!(target: "sub-libp2p", "Networking ended ; disconnecting \
all peers");
debug!(target: "sub-libp2p", "Networking ended ; disconnecting all peers");
shared.network_state.disconnect_all();
Ok(())
}))
@@ -593,8 +595,7 @@ fn listener_handle<'a, C>(
where C: AsyncRead + AsyncWrite + 'a {
match upgrade {
FinalUpgrade::Kad(controller, kademlia_stream, client_addr) => {
trace!(target: "sub-libp2p", "Opened kademlia substream with {:?}",
client_addr);
trace!(target: "sub-libp2p", "Opened kademlia substream with {:?}", client_addr);
match handle_kademlia_connection(shared, client_addr, controller, kademlia_stream) {
Ok(fut) => Box::new(fut) as Box<_>,
Err(err) => Box::new(future::err(err)) as Box<_>,
@@ -624,8 +625,7 @@ fn listener_handle<'a, C>(
let node_id = p2p_multiaddr_to_node_id(client_addr);
match shared.network_state.ping_connection(node_id.clone()) {
Ok((_, ping_connec)) => {
trace!(target: "sub-libp2p", "Successfully opened ping \
substream with {:?}", node_id);
trace!(target: "sub-libp2p", "Successfully opened ping substream with {:?}", node_id);
let fut = ping_connec.set_until(pinger, future);
Box::new(fut) as Box<_>
},
@@ -650,7 +650,7 @@ fn handle_kademlia_connection(
kademlia_stream: Box<Stream<Item = KadIncomingRequest, Error = IoError>>
) -> Result<impl Future<Item = (), Error = IoError>, IoError> {
let node_id = p2p_multiaddr_to_node_id(client_addr);
let (peer_id, kad_connec) = shared.network_state
let (who, kad_connec) = shared.network_state
.kad_connection(node_id.clone())?;
let node_id2 = node_id.clone();
@@ -679,8 +679,7 @@ fn handle_kademlia_connection(
Ok(future::Loop::Continue(rest))
})
}).then(move |val| {
trace!(target: "sub-libp2p", "Closed Kademlia connection \
with #{} {:?} => {:?}", peer_id, node_id2, val);
trace!(target: "sub-libp2p", "Closed Kademlia connection with #{} {:?} => {:?}", who, node_id2, val);
val
});
@@ -695,16 +694,16 @@ fn build_kademlia_response(
) -> Vec<KadPeer> {
shared.kad_system
.known_closest_peers(searched)
.map(move |peer_id| {
if peer_id == *shared.kad_system.local_peer_id() {
.map(move |who| {
if who == *shared.kad_system.local_peer_id() {
KadPeer {
node_id: peer_id.clone(),
node_id: who.clone(),
multiaddrs: shared.listened_addrs.read().clone(),
connection_ty: KadConnectionType::Connected,
}
} else {
let addrs = shared.network_state.addrs_of_peer(&peer_id);
let connec_ty = if shared.network_state.has_connection(&peer_id) {
let addrs = shared.network_state.addrs_of_peer(&who);
let connec_ty = if shared.network_state.has_connection(&who) {
// TODO: this only checks connections with substrate ; but what
// if we're connected through Kademlia only?
KadConnectionType::Connected
@@ -713,7 +712,7 @@ fn build_kademlia_response(
};
KadPeer {
node_id: peer_id.clone(),
node_id: who.clone(),
multiaddrs: addrs,
connection_ty: connec_ty,
}
@@ -749,7 +748,7 @@ fn handle_custom_connection(
// TODO: is there a better way to refuse connections than to drop the
// newly-opened substream? should we refuse the connection
// beforehand?
let (peer_id, unique_connec) = match shared.network_state.custom_proto(
let (who, unique_connec) = match shared.network_state.custom_proto(
node_id.clone(),
protocol_id,
custom_proto_out.endpoint,
@@ -759,14 +758,17 @@ fn handle_custom_connection(
};
if let UniqueConnecState::Full = unique_connec.state() {
debug!(target: "sub-libp2p", "Interrupting connection attempt to {:?} \
with {:?} because we're already connected", node_id, custom_proto_out.protocol_id);
debug!(target: "sub-libp2p",
"Interrupting connection attempt to {:?} with {:?} because we're already connected",
node_id,
custom_proto_out.protocol_id
);
return future::Either::A(future::ok(()))
}
struct ProtoDisconnectGuard {
inner: Arc<Shared>,
peer_id: PeerId,
who: NodeIndex,
node_id: PeerstorePeerId,
handler: Arc<NetworkProtocolHandler + Send + Sync>,
protocol: ProtocolId
@@ -774,27 +776,27 @@ fn handle_custom_connection(
impl Drop for ProtoDisconnectGuard {
fn drop(&mut self) {
debug!(target: "sub-libp2p",
info!(target: "sub-libp2p",
"Node {:?} with peer ID {} through protocol {:?} disconnected",
self.node_id,
self.peer_id,
self.who,
self.protocol
);
self.handler.disconnected(&NetworkContextImpl {
inner: self.inner.clone(),
protocol: self.protocol,
current_peer: Some(self.peer_id),
}, &self.peer_id);
current_peer: Some(self.who),
}, &self.who);
// When any custom protocol drops, we drop the peer entirely.
// TODO: is this correct?
self.inner.network_state.drop_peer(self.peer_id, Some("Remote end disconnected"));
self.inner.network_state.drop_peer(self.who);
}
}
let dc_guard = ProtoDisconnectGuard {
inner: shared.clone(),
peer_id,
who,
node_id: node_id.clone(),
handler: handler.clone(),
protocol: protocol_id,
@@ -810,8 +812,8 @@ fn handle_custom_connection(
handler.read(&NetworkContextImpl {
inner: shared.clone(),
protocol: protocol_id,
current_peer: Some(peer_id.clone()),
}, &peer_id, packet_id, &data);
current_peer: Some(who.clone()),
}, &who, packet_id, &data);
Ok(())
}
});
@@ -824,15 +826,19 @@ fn handle_custom_connection(
val
});
debug!(target: "sub-libp2p", "Successfully connected to {:?} (peer id \
{}) with protocol {:?} version {}", node_id, peer_id, protocol_id,
custom_proto_out.protocol_version);
debug!(target: "sub-libp2p",
"Successfully connected to {:?} (peer id {}) with protocol {:?} version {}",
node_id,
who,
protocol_id,
custom_proto_out.protocol_version
);
handler.connected(&NetworkContextImpl {
inner: shared.clone(),
protocol: protocol_id,
current_peer: Some(peer_id),
}, &peer_id);
current_peer: Some(who),
}, &who);
future::Either::B(final_fut)
}
@@ -869,10 +875,10 @@ fn start_kademlia_discovery<T, To, St, C>(shared: Arc<Shared>, transport: T,
let shared = shared.clone();
let transport = transport.clone();
let swarm_controller = swarm_controller.clone();
move |peer_id|
move |who|
obtain_kad_connection(
shared.clone(),
peer_id.clone(),
who.clone(),
transport.clone(),
swarm_controller.clone()
)
@@ -938,8 +944,7 @@ fn perform_kademlia_query<T, To, St, C>(
let random_key = PublicKey::Ed25519((0 .. 32)
.map(|_| -> u8 { rand::random() }).collect());
let random_peer_id = random_key.into_peer_id();
trace!(target: "sub-libp2p", "Start kademlia discovery for {:?}",
random_peer_id);
trace!(target: "sub-libp2p", "Start kademlia discovery for {:?}", random_peer_id);
shared.clone()
.kad_system
@@ -947,7 +952,7 @@ fn perform_kademlia_query<T, To, St, C>(
let shared = shared.clone();
let transport = transport.clone();
let swarm_controller = swarm_controller.clone();
move |peer_id| obtain_kad_connection(shared.clone(), peer_id.clone(),
move |who| obtain_kad_connection(shared.clone(), who.clone(),
transport.clone(), swarm_controller.clone())
})
.filter_map(move |event|
@@ -980,8 +985,10 @@ fn connect_to_nodes<T, To, St, C>(
St: MuxedTransport<Output = FinalUpgrade<C>> + Clone + 'static,
C: 'static {
let num_slots = shared.network_state.should_open_outgoing_custom_connections();
debug!(target: "sub-libp2p", "Outgoing connections cycle ; opening up to \
{} outgoing connections", num_slots);
debug!(target: "sub-libp2p",
"Outgoing connections cycle ; opening up to {} outgoing connections",
num_slots
);
for _ in 0 .. num_slots {
// Choose a random peer. We are potentially already connected to
@@ -1034,8 +1041,11 @@ fn connect_with_query_peer_id<T, To, St, C>(
.and_then(move |info| {
let _ = process_identify_info(shared, &info, original_addr,
endpoint, &base_transport);
trace!(target: "sub-libp2p", "Bootnode {:} found with peer id: {:?}",
addr2, info.info.public_key.into_peer_id());
trace!(target: "sub-libp2p",
"Bootnode {:} found with peer id: {:?}",
addr2,
info.info.public_key.into_peer_id()
);
upgrade::apply(socket, proto, endpoint, client_addr)
})
})
@@ -1050,15 +1060,19 @@ fn connect_with_query_peer_id<T, To, St, C>(
.map_err({
let addr = addr.clone();
move |err| {
warn!(target: "sub-libp2p", "Error while dialing {:?} to query peer id: {:?}",
addr, err);
warn!(target: "sub-libp2p",
"Error while dialing {:?} to query peer id: {:?}",
addr,
err
);
err
}
});
let _ = swarm_controller.dial(addr.clone(), with_err)
.map_err( move |err| warn!(target: "sub-libp2p",
"Error when querying peer node info {:} of {:}", err, addr));
let _ = swarm_controller.dial(addr.clone(), with_err)
.map_err(move |err|
warn!(target: "sub-libp2p", "Error when querying peer node info {:} of {:}", err, addr)
);
}
/// If necessary, dials the given address for the given protocol and using the
@@ -1081,8 +1095,7 @@ fn open_peer_custom_proto<T, To, St, C>(
// Don't connect to ourselves.
// TODO: remove this eventually
if &expected_peer_id == shared.kad_system.local_peer_id() {
trace!(target: "sub-libp2p", "Skipped connecting to {:?} because \
it is ourselves", expected_peer_id);
trace!(target: "sub-libp2p", "Skipped connecting to {:?} because it is ourselves", expected_peer_id);
return
}
@@ -1102,13 +1115,14 @@ fn open_peer_custom_proto<T, To, St, C>(
if info.info.public_key.into_peer_id() == expected_peer_id {
Ok(socket)
} else {
debug!(target: "sub-libp2p", "Public key mismatch for \
node {:?} with proto {:?}", expected_peer_id, proto_id);
trace!(target: "sub-libp2p", "Removing addr {} for {:?}",
original_addr, expected_peer_id);
debug!(target: "sub-libp2p",
"Public key mismatch for node {:?} with proto {:?}",
expected_peer_id,
proto_id
);
trace!(target: "sub-libp2p", "Removing addr {} for {:?}", original_addr, expected_peer_id);
shared.network_state.set_invalid_kad_address(&expected_peer_id, &original_addr);
Err(IoError::new(IoErrorKind::InvalidData, "public \
key mismatch when identifyed peer"))
Err(IoError::new(IoErrorKind::InvalidData, "public key mismatch when identifyed peer"))
}
)
.and_then(move |socket|
@@ -1128,32 +1142,39 @@ fn open_peer_custom_proto<T, To, St, C>(
.map_err({
let node_id = node_id.clone();
move |err| {
debug!(target: "sub-libp2p", "Error while dialing \
{:?} with custom proto: {:?}", node_id, err);
debug!(target: "sub-libp2p", "Error while dialing {:?} with custom proto: {:?}", node_id, err);
err
}
});
match shared2.network_state.custom_proto(node_id.clone(), proto_id, Endpoint::Dialer) {
Ok((peer_id, unique_connec)) => {
Ok((who, unique_connec)) => {
if !unique_connec.is_alive() {
trace!(target: "sub-libp2p", "Opening connection to #{} {:?} with \
proto {:?}", peer_id, node_id, proto_id);
trace!(target: "sub-libp2p",
"Opening connection to #{} {:?} with proto {:?}",
who,
node_id,
proto_id
);
}
// TODO: this future should be used
let _ = unique_connec.get_or_dial(&swarm_controller, &addr, with_err);
},
Err(err) => {
trace!(target: "sub-libp2p", "Error while opening connection to
{:?} with proto {:?} => {:?}", node_id, proto_id, err);
trace!(target: "sub-libp2p",
"Error while opening connection to {:?} with proto {:?} => {:?}",
node_id,
proto_id,
err
);
},
}
}
/// Obtain a Kademlia connection to the given peer.
fn obtain_kad_connection<T, To, St, C>(shared: Arc<Shared>,
peer_id: PeerstorePeerId, transport: T, swarm_controller: SwarmController<St>)
who: PeerstorePeerId, transport: T, swarm_controller: SwarmController<St>)
-> impl Future<Item = KadConnecController, Error = IoError>
where T: MuxedTransport<Output = TransportOutput<To>> + Clone + 'static,
T::MultiaddrFuture: 'static,
@@ -1161,7 +1182,7 @@ fn obtain_kad_connection<T, To, St, C>(shared: Arc<Shared>,
St: MuxedTransport<Output = FinalUpgrade<C>> + Clone + 'static,
C: 'static {
let kad_upgrade = shared.kad_upgrade.clone();
let addr: Multiaddr = AddrComponent::P2P(peer_id.clone().into_bytes()).into();
let addr: Multiaddr = AddrComponent::P2P(who.clone().into_bytes()).into();
let transport = transport
.and_then(move |out, endpoint, client_addr|
upgrade::apply(out.socket, kad_upgrade.clone(),
@@ -1175,7 +1196,7 @@ fn obtain_kad_connection<T, To, St, C>(shared: Arc<Shared>,
});
shared.network_state
.kad_connection(peer_id.clone())
.kad_connection(who.clone())
.into_future()
.map(move |(_, k)| k.get_or_dial(&swarm_controller, &addr, transport))
.flatten()
@@ -1204,14 +1225,15 @@ fn process_identify_info(
if let Some(mut ext_addr) = transport.nat_traversal(original_listened_addr, &info.observed_addr) {
let mut listened_addrs = shared.listened_addrs.write();
if !listened_addrs.iter().any(|a| a == &ext_addr) {
trace!(target: "sub-libp2p", "NAT traversal: remote observes us as \
{} ; registering {} as one of our own addresses",
info.observed_addr, ext_addr);
trace!(target: "sub-libp2p",
"NAT traversal: remote observes us as {}; registering {} as one of our own addresses",
info.observed_addr,
ext_addr
);
listened_addrs.push(ext_addr.clone());
ext_addr.append(AddrComponent::P2P(shared.kad_system
.local_peer_id().clone().into_bytes()));
info!(target: "sub-libp2p", "New external node address: {}",
ext_addr);
info!(target: "sub-libp2p", "New external node address: {}", ext_addr);
}
}
}
@@ -1273,16 +1295,16 @@ fn ping_all<T, St, C>(
C: 'static {
let mut ping_futures = Vec::new();
for (peer, peer_id, pinger) in shared.network_state.cleanup_and_prepare_ping() {
for (peer, who, pinger) in shared.network_state.cleanup_and_prepare_ping() {
let shared = shared.clone();
let addr = Multiaddr::from(AddrComponent::P2P(peer_id.clone().into_bytes()));
let addr = Multiaddr::from(AddrComponent::P2P(who.clone().into_bytes()));
let fut = pinger
.get_or_dial(&swarm_controller, &addr, transport.clone())
.and_then(move |mut p| {
trace!(target: "sub-libp2p", "Pinging peer #{} aka. {:?}", peer, peer_id);
trace!(target: "sub-libp2p", "Pinging peer #{} aka. {:?}", peer, who);
p.ping()
.map(|()| peer_id)
.map(|()| who)
.map_err(|err| IoError::new(IoErrorKind::Other, err))
});
let ping_start_time = Instant::now();
@@ -1291,15 +1313,15 @@ fn ping_all<T, St, C>(
match val {
Err(err) => {
trace!(target: "sub-libp2p", "Error while pinging #{:?} => {:?}", peer, err);
shared.network_state.drop_peer(peer, None); // None so that we don't print messages on such low-level issues.
shared.network_state.report_ping_failed(peer);
// Return Ok, otherwise we would close the ping service
Ok(())
},
Ok(peer_id) => {
Ok(who) => {
let elapsed = ping_start_time.elapsed();
trace!(target: "sub-libp2p", "Pong from #{:?} in {:?}", peer, elapsed);
shared.network_state.report_ping_duration(peer, elapsed);
shared.kad_system.update_kbuckets(peer_id);
shared.kad_system.update_kbuckets(who);
Ok(())
}
}
@@ -36,7 +36,8 @@ pub type ProtocolId = [u8; 3];
pub type NodeId = H512;
/// Local (temporary) peer session ID.
pub type PeerId = usize;
/// RENAME TO NodeIndex
pub type NodeIndex = usize;
/// Messages used to communitate with the event loop from other threads.
#[derive(Clone)]
@@ -62,9 +63,9 @@ pub enum NetworkIoMessage {
/// Initliaze public interface.
InitPublicInterface,
/// Disconnect a peer.
Disconnect(PeerId),
Disconnect(NodeIndex),
/// Disconnect and temporary disable peer.
DisablePeer(PeerId),
DisablePeer(NodeIndex),
/// Network has been started with the host as the given enode.
NetworkStarted(String),
}
@@ -240,16 +241,16 @@ impl<'a> fmt::Display for Severity<'a> {
/// IO access point. This is passed to all IO handlers and provides an interface to the IO subsystem.
pub trait NetworkContext {
/// Send a packet over the network to another peer.
fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec<u8>);
fn send(&self, peer: NodeIndex, packet_id: PacketId, data: Vec<u8>);
/// Send a packet over the network to another peer using specified protocol.
fn send_protocol(&self, protocol: ProtocolId, peer: PeerId, packet_id: PacketId, data: Vec<u8>);
fn send_protocol(&self, protocol: ProtocolId, peer: NodeIndex, packet_id: PacketId, data: Vec<u8>);
/// Respond to a current network message. Panics if no there is no packet in the context. If the session is expired returns nothing.
fn respond(&self, packet_id: PacketId, data: Vec<u8>);
/// Report peer. Depending on the report, peer may be disconnected and possibly banned.
fn report_peer(&self, peer: PeerId, reason: Severity);
fn report_peer(&self, peer: NodeIndex, reason: Severity);
/// Check if the session is still active.
fn is_expired(&self) -> bool;
@@ -258,24 +259,24 @@ pub trait NetworkContext {
fn register_timer(&self, token: TimerToken, delay: Duration) -> Result<(), Error>;
/// Returns peer identification string
fn peer_client_version(&self, peer: PeerId) -> String;
fn peer_client_version(&self, peer: NodeIndex) -> String;
/// Returns information on p2p session
fn session_info(&self, peer: PeerId) -> Option<SessionInfo>;
fn session_info(&self, peer: NodeIndex) -> Option<SessionInfo>;
/// Returns max version for a given protocol.
fn protocol_version(&self, protocol: ProtocolId, peer: PeerId) -> Option<u8>;
fn protocol_version(&self, protocol: ProtocolId, peer: NodeIndex) -> Option<u8>;
/// Returns this object's subprotocol name.
fn subprotocol_name(&self) -> ProtocolId;
}
impl<'a, T> NetworkContext for &'a T where T: ?Sized + NetworkContext {
fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec<u8>) {
fn send(&self, peer: NodeIndex, packet_id: PacketId, data: Vec<u8>) {
(**self).send(peer, packet_id, data)
}
fn send_protocol(&self, protocol: ProtocolId, peer: PeerId, packet_id: PacketId, data: Vec<u8>) {
fn send_protocol(&self, protocol: ProtocolId, peer: NodeIndex, packet_id: PacketId, data: Vec<u8>) {
(**self).send_protocol(protocol, peer, packet_id, data)
}
@@ -283,7 +284,7 @@ impl<'a, T> NetworkContext for &'a T where T: ?Sized + NetworkContext {
(**self).respond(packet_id, data)
}
fn report_peer(&self, peer: PeerId, reason: Severity) {
fn report_peer(&self, peer: NodeIndex, reason: Severity) {
(**self).report_peer(peer, reason)
}
@@ -295,15 +296,15 @@ impl<'a, T> NetworkContext for &'a T where T: ?Sized + NetworkContext {
(**self).register_timer(token, delay)
}
fn peer_client_version(&self, peer: PeerId) -> String {
fn peer_client_version(&self, peer: NodeIndex) -> String {
(**self).peer_client_version(peer)
}
fn session_info(&self, peer: PeerId) -> Option<SessionInfo> {
fn session_info(&self, peer: NodeIndex) -> Option<SessionInfo> {
(**self).session_info(peer)
}
fn protocol_version(&self, protocol: ProtocolId, peer: PeerId) -> Option<u8> {
fn protocol_version(&self, protocol: ProtocolId, peer: NodeIndex) -> Option<u8> {
(**self).protocol_version(protocol, peer)
}
@@ -319,11 +320,11 @@ pub trait NetworkProtocolHandler: Sync + Send {
/// Initialize the handler
fn initialize(&self, _io: &NetworkContext) {}
/// Called when new network packet received.
fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]);
fn read(&self, io: &NetworkContext, peer: &NodeIndex, packet_id: u8, data: &[u8]);
/// Called when new peer is connected. Only called when peer supports the same protocol.
fn connected(&self, io: &NetworkContext, peer: &PeerId);
fn connected(&self, io: &NetworkContext, peer: &NodeIndex);
/// Called when a previously connected peer disconnects.
fn disconnected(&self, io: &NetworkContext, peer: &PeerId);
fn disconnected(&self, io: &NetworkContext, peer: &NodeIndex);
/// Timer function called after a timeout created with `NetworkContext::timeout`.
fn timeout(&self, _io: &NetworkContext, _timer: TimerToken) {}
}
@@ -66,12 +66,12 @@ impl NetworkProtocolHandler for TestProtocol {
io.register_timer(0, Duration::from_millis(10)).unwrap();
}
fn read(&self, _io: &NetworkContext, _peer: &PeerId, packet_id: u8, data: &[u8]) {
fn read(&self, _io: &NetworkContext, _peer: &NodeIndex, packet_id: u8, data: &[u8]) {
assert_eq!(packet_id, 33);
self.packet.lock().extend(data);
}
fn connected(&self, io: &NetworkContext, peer: &PeerId) {
fn connected(&self, io: &NetworkContext, peer: &NodeIndex) {
if self.drop_session {
io.report_peer(*peer, Severity::Bad("We are evil and just want to drop"))
} else {
@@ -79,7 +79,7 @@ impl NetworkProtocolHandler for TestProtocol {
}
}
fn disconnected(&self, _io: &NetworkContext, _peer: &PeerId) {
fn disconnected(&self, _io: &NetworkContext, _peer: &NodeIndex) {
self.got_disconnect.store(true, AtomicOrdering::Relaxed);
}
+10 -10
View File
@@ -19,7 +19,7 @@ use std::cmp;
use std::ops::Range;
use std::collections::{HashMap, BTreeMap};
use std::collections::hash_map::Entry;
use network_libp2p::PeerId;
use network_libp2p::NodeIndex;
use runtime_primitives::traits::{Block as BlockT, NumberFor, As};
use message;
@@ -29,7 +29,7 @@ const MAX_PARALLEL_DOWNLOADS: u32 = 1;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BlockData<B: BlockT> {
pub block: message::BlockData<B>,
pub origin: PeerId,
pub origin: NodeIndex,
}
#[derive(Debug)]
@@ -55,7 +55,7 @@ impl<B: BlockT> BlockRangeState<B> {
pub struct BlockCollection<B: BlockT> {
/// Downloaded blocks.
blocks: BTreeMap<NumberFor<B>, BlockRangeState<B>>,
peer_requests: HashMap<PeerId, NumberFor<B>>,
peer_requests: HashMap<NodeIndex, NumberFor<B>>,
}
impl<B: BlockT> BlockCollection<B> {
@@ -74,7 +74,7 @@ impl<B: BlockT> BlockCollection<B> {
}
/// Insert a set of blocks into collection.
pub fn insert(&mut self, start: NumberFor<B>, blocks: Vec<message::BlockData<B>>, peer_id: PeerId) {
pub fn insert(&mut self, start: NumberFor<B>, blocks: Vec<message::BlockData<B>>, who: NodeIndex) {
if blocks.is_empty() {
return;
}
@@ -92,11 +92,11 @@ impl<B: BlockT> BlockCollection<B> {
_ => (),
}
self.blocks.insert(start, BlockRangeState::Complete(blocks.into_iter().map(|b| BlockData { origin: peer_id, block: b }).collect()));
self.blocks.insert(start, BlockRangeState::Complete(blocks.into_iter().map(|b| BlockData { origin: who, block: b }).collect()));
}
/// Returns a set of block hashes that require a header download. The returned set is marked as being downloaded.
pub fn needed_blocks(&mut self, peer_id: PeerId, count: usize, peer_best: NumberFor<B>, common: NumberFor<B>) -> Option<Range<NumberFor<B>>> {
pub fn needed_blocks(&mut self, who: NodeIndex, count: usize, peer_best: NumberFor<B>, common: NumberFor<B>) -> Option<Range<NumberFor<B>>> {
// First block number that we need to download
let first_different = common + As::sa(1);
let count = As::sa(count as u64);
@@ -125,11 +125,11 @@ impl<B: BlockT> BlockCollection<B> {
};
// crop to peers best
if range.start > peer_best {
trace!(target: "sync", "Out of range for peer {} ({} vs {})", peer_id, range.start, peer_best);
trace!(target: "sync", "Out of range for peer {} ({} vs {})", who, range.start, peer_best);
return None;
}
range.end = cmp::min(peer_best + As::sa(1), range.end);
self.peer_requests.insert(peer_id, range.start);
self.peer_requests.insert(who, range.start);
self.blocks.insert(range.start, BlockRangeState::Downloading{ len: range.end - range.start, downloading: downloading + 1 });
if range.end <= range.start {
panic!("Empty range {:?}, count={}, peer_best={}, common={}, blocks={:?}", range, count, peer_best, common, self.blocks);
@@ -162,8 +162,8 @@ impl<B: BlockT> BlockCollection<B> {
drained
}
pub fn clear_peer_download(&mut self, peer_id: PeerId) {
match self.peer_requests.entry(peer_id) {
pub fn clear_peer_download(&mut self, who: NodeIndex) {
match self.peer_requests.entry(who) {
Entry::Occupied(entry) => {
let start = entry.remove();
let remove = match self.blocks.get_mut(&start) {
@@ -20,7 +20,7 @@
use std::collections::{HashMap, HashSet};
use futures::sync::mpsc;
use std::time::{Instant, Duration};
use network_libp2p::PeerId;
use network_libp2p::NodeIndex;
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT};
use runtime_primitives::generic::BlockId;
use message::{self, generic::Message as GenericMessage};
@@ -51,7 +51,7 @@ struct MessageEntry<B: BlockT> {
/// Consensus network protocol handler. Manages statements and candidate requests.
pub struct ConsensusGossip<B: BlockT> {
peers: HashMap<PeerId, PeerConsensus<B::Hash>>,
peers: HashMap<NodeIndex, PeerConsensus<B::Hash>>,
message_sink: Option<(mpsc::UnboundedSender<ConsensusMessage<B>>, B::Hash)>,
messages: Vec<MessageEntry<B>>,
message_hashes: HashSet<B::Hash>,
@@ -74,9 +74,9 @@ impl<B: BlockT> ConsensusGossip<B> where B::Header: HeaderT<Number=u64> {
}
/// Handle new connected peer.
pub fn new_peer(&mut self, protocol: &mut Context<B>, peer_id: PeerId, roles: Roles) {
pub fn new_peer(&mut self, protocol: &mut Context<B>, who: NodeIndex, roles: Roles) {
if roles.intersects(Roles::AUTHORITY | Roles::FULL) {
trace!(target:"gossip", "Registering {:?} {}", roles, peer_id);
trace!(target:"gossip", "Registering {:?} {}", roles, who);
// Send out all known messages.
// TODO: limit by size
let mut known_messages = HashSet::new();
@@ -87,9 +87,9 @@ impl<B: BlockT> ConsensusGossip<B> where B::Header: HeaderT<Number=u64> {
ConsensusMessage::ChainSpecific(ref msg, _) => GenericMessage::ChainSpecific(msg.clone()),
};
protocol.send_message(peer_id, message);
protocol.send_message(who, message);
}
self.peers.insert(peer_id, PeerConsensus {
self.peers.insert(who, PeerConsensus {
known_messages,
});
}
@@ -115,16 +115,16 @@ impl<B: BlockT> ConsensusGossip<B> where B::Header: HeaderT<Number=u64> {
}
/// Handles incoming BFT message, passing to stream and repropagating.
pub fn on_bft_message(&mut self, protocol: &mut Context<B>, peer_id: PeerId, message: message::LocalizedBftMessage<B>) {
if let Some((hash, message)) = self.handle_incoming(protocol, peer_id, ConsensusMessage::Bft(message)) {
pub fn on_bft_message(&mut self, protocol: &mut Context<B>, who: NodeIndex, message: message::LocalizedBftMessage<B>) {
if let Some((hash, message)) = self.handle_incoming(protocol, who, ConsensusMessage::Bft(message)) {
// propagate to other peers.
self.multicast(protocol, message, Some(hash));
}
}
/// Handles incoming chain-specific message and repropagates
pub fn on_chain_specific(&mut self, protocol: &mut Context<B>, peer_id: PeerId, message: Vec<u8>, parent_hash: B::Hash) {
if let Some((hash, message)) = self.handle_incoming(protocol, peer_id, ConsensusMessage::ChainSpecific(message, parent_hash)) {
pub fn on_chain_specific(&mut self, protocol: &mut Context<B>, who: NodeIndex, message: Vec<u8>, parent_hash: B::Hash) {
if let Some((hash, message)) = self.handle_incoming(protocol, who, ConsensusMessage::ChainSpecific(message, parent_hash)) {
// propagate to other peers.
self.multicast(protocol, message, Some(hash));
}
@@ -163,8 +163,8 @@ impl<B: BlockT> ConsensusGossip<B> where B::Header: HeaderT<Number=u64> {
}
/// Call when a peer has been disconnected to stop tracking gossip status.
pub fn peer_disconnected(&mut self, _protocol: &mut Context<B>, peer_id: PeerId) {
self.peers.remove(&peer_id);
pub fn peer_disconnected(&mut self, _protocol: &mut Context<B>, who: NodeIndex) {
self.peers.remove(&who);
}
/// Prune old or no longer relevant consensus messages.
@@ -195,7 +195,7 @@ impl<B: BlockT> ConsensusGossip<B> where B::Header: HeaderT<Number=u64> {
}
}
fn handle_incoming(&mut self, protocol: &mut Context<B>, peer_id: PeerId, message: ConsensusMessage<B>) -> Option<(B::Hash, ConsensusMessage<B>)> {
fn handle_incoming(&mut self, protocol: &mut Context<B>, who: NodeIndex, message: ConsensusMessage<B>) -> Option<(B::Hash, ConsensusMessage<B>)> {
let (hash, parent, message) = match message {
ConsensusMessage::Bft(msg) => {
let parent = msg.parent_hash;
@@ -223,7 +223,7 @@ impl<B: BlockT> ConsensusGossip<B> where B::Header: HeaderT<Number=u64> {
};
if self.message_hashes.contains(&hash) {
trace!(target:"gossip", "Ignored already known message from {}", peer_id);
trace!(target:"gossip", "Ignored already known message from {}", who);
return None;
}
@@ -234,14 +234,14 @@ impl<B: BlockT> ConsensusGossip<B> where B::Header: HeaderT<Number=u64> {
},
(Ok(info), Ok(Some(header))) => {
if header.number() < &info.chain.best_number {
trace!(target:"gossip", "Ignored ancient message from {}, hash={}", peer_id, parent);
trace!(target:"gossip", "Ignored ancient message from {}, hash={}", who, parent);
return None;
}
},
(Ok(_), Ok(None)) => {},
}
if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
if let Some(ref mut peer) = self.peers.get_mut(&who) {
peer.known_messages.insert(hash);
if let Some((sink, parent_hash)) = self.message_sink.take() {
if parent == parent_hash {
@@ -253,7 +253,7 @@ impl<B: BlockT> ConsensusGossip<B> where B::Header: HeaderT<Number=u64> {
self.message_sink = Some((sink, parent_hash));
}
} else {
trace!(target:"gossip", "Ignored statement from unregistered peer {}", peer_id);
trace!(target:"gossip", "Ignored statement from unregistered peer {}", who);
return None;
}
+15 -15
View File
@@ -22,7 +22,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use parking_lot::{Condvar, Mutex, RwLock};
use client::{BlockOrigin, BlockStatus, ImportResult};
use network_libp2p::{PeerId, Severity};
use network_libp2p::{NodeIndex, Severity};
use runtime_primitives::generic::BlockId;
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor, Zero};
@@ -202,9 +202,9 @@ trait SyncLinkApi<B: BlockT> {
/// Maintain sync.
fn maintain_sync(&mut self);
/// Disconnect from peer.
fn useless_peer(&mut self, peer_id: PeerId, reason: &str);
fn useless_peer(&mut self, who: NodeIndex, reason: &str);
/// Disconnect from peer and restart sync.
fn note_useless_and_restart_sync(&mut self, peer_id: PeerId, reason: &str);
fn note_useless_and_restart_sync(&mut self, who: NodeIndex, reason: &str);
/// Restart sync.
fn restart(&mut self);
}
@@ -233,9 +233,9 @@ enum BlockImportResult<H: ::std::fmt::Debug + PartialEq, N: ::std::fmt::Debug +
#[derive(Debug, PartialEq)]
enum BlockImportError {
/// Disconnect from peer and continue import of next bunch of blocks.
Disconnect(PeerId),
Disconnect(NodeIndex),
/// Disconnect from peer and restart sync.
DisconnectAndRestart(PeerId),
DisconnectAndRestart(NodeIndex),
/// Restart sync.
Restart,
}
@@ -356,16 +356,16 @@ fn process_import_result<'a, B: BlockT>(
link.block_imported(&hash, number);
1
},
Err(BlockImportError::Disconnect(peer_id)) => {
Err(BlockImportError::Disconnect(who)) => {
// TODO: FIXME: @arkpar BlockImport shouldn't be trying to manage the peer set.
// This should contain an actual reason.
link.useless_peer(peer_id, "Import result was stated Disconnect");
link.useless_peer(who, "Import result was stated Disconnect");
0
},
Err(BlockImportError::DisconnectAndRestart(peer_id)) => {
Err(BlockImportError::DisconnectAndRestart(who)) => {
// TODO: FIXME: @arkpar BlockImport shouldn't be trying to manage the peer set.
// This should contain an actual reason.
link.note_useless_and_restart_sync(peer_id, "Import result was stated DisconnectAndRestart");
link.note_useless_and_restart_sync(who, "Import result was stated DisconnectAndRestart");
0
},
Err(BlockImportError::Restart) => {
@@ -408,13 +408,13 @@ impl<'a, B: 'static + BlockT, E: ExecuteInContext<B>> SyncLinkApi<B> for SyncLin
self.with_sync(|sync, protocol| sync.maintain_sync(protocol))
}
fn useless_peer(&mut self, peer_id: PeerId, reason: &str) {
self.with_sync(|_, protocol| protocol.report_peer(peer_id, Severity::Useless(reason)))
fn useless_peer(&mut self, who: NodeIndex, reason: &str) {
self.with_sync(|_, protocol| protocol.report_peer(who, Severity::Useless(reason)))
}
fn note_useless_and_restart_sync(&mut self, peer_id: PeerId, reason: &str) {
fn note_useless_and_restart_sync(&mut self, who: NodeIndex, reason: &str) {
self.with_sync(|sync, protocol| {
protocol.report_peer(peer_id, Severity::Useless(reason)); // is this actually malign or just useless?
protocol.report_peer(who, Severity::Useless(reason)); // is this actually malign or just useless?
sync.restart(protocol);
})
}
@@ -490,8 +490,8 @@ pub mod tests {
fn chain(&self) -> &Client<Block> { &*self.chain }
fn block_imported(&mut self, _hash: &Hash, _number: NumberFor<Block>) { self.imported += 1; }
fn maintain_sync(&mut self) { self.maintains += 1; }
fn useless_peer(&mut self, _: PeerId, _: &str) { self.disconnects += 1; }
fn note_useless_and_restart_sync(&mut self, _: PeerId, _: &str) { self.disconnects += 1; self.restarts += 1; }
fn useless_peer(&mut self, _: NodeIndex, _: &str) { self.disconnects += 1; }
fn note_useless_and_restart_sync(&mut self, _: NodeIndex, _: &str) { self.disconnects += 1; self.restarts += 1; }
fn restart(&mut self) { self.restarts += 1; }
}
+14 -14
View File
@@ -14,21 +14,21 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?
use network_libp2p::{NetworkContext, Severity, PeerId, SessionInfo};
use network_libp2p::{NetworkContext, Severity, NodeIndex, SessionInfo};
/// IO interface for the syncing handler.
/// Provides peer connection management and an interface to the blockchain client.
pub trait SyncIo {
/// Report a peer for misbehaviour.
fn report_peer(&mut self, peer_id: PeerId, reason: Severity);
fn report_peer(&mut self, who: NodeIndex, reason: Severity);
/// Send a packet to a peer.
fn send(&mut self, peer_id: PeerId, data: Vec<u8>);
fn send(&mut self, who: NodeIndex, data: Vec<u8>);
/// Returns peer identifier string
fn peer_info(&self, peer_id: PeerId) -> String {
peer_id.to_string()
fn peer_info(&self, who: NodeIndex) -> String {
who.to_string()
}
/// Returns information on p2p session
fn peer_session_info(&self, peer_id: PeerId) -> Option<SessionInfo>;
fn peer_session_info(&self, who: NodeIndex) -> Option<SessionInfo>;
/// Check if the session is expired
fn is_expired(&self) -> bool;
}
@@ -48,24 +48,24 @@ impl<'s> NetSyncIo<'s> {
}
impl<'s> SyncIo for NetSyncIo<'s> {
fn report_peer(&mut self, peer_id: PeerId, reason: Severity) {
self.network.report_peer(peer_id, reason);
fn report_peer(&mut self, who: NodeIndex, reason: Severity) {
self.network.report_peer(who, reason);
}
fn send(&mut self, peer_id: PeerId, data: Vec<u8>) {
self.network.send(peer_id, 0, data)
fn send(&mut self, who: NodeIndex, data: Vec<u8>) {
self.network.send(who, 0, data)
}
fn peer_session_info(&self, peer_id: PeerId) -> Option<SessionInfo> {
self.network.session_info(peer_id)
fn peer_session_info(&self, who: NodeIndex) -> Option<SessionInfo> {
self.network.session_info(who)
}
fn is_expired(&self) -> bool {
self.network.is_expired()
}
fn peer_info(&self, peer_id: PeerId) -> String {
self.network.peer_client_version(peer_id)
fn peer_info(&self, who: NodeIndex) -> String {
self.network.peer_client_version(who)
}
}
+1 -1
View File
@@ -59,7 +59,7 @@ pub use service::{Service, FetchFuture, ConsensusService, BftMessageStream,
TransactionPool, Params, ManageNetwork, SyncProvider};
pub use protocol::{ProtocolStatus, PeerInfo, Context};
pub use sync::{Status as SyncStatus, SyncState};
pub use network_libp2p::{NonReservedPeerMode, NetworkConfiguration, PeerId, ProtocolId, ConnectionFilter, ConnectionDirection, Severity};
pub use network_libp2p::{NonReservedPeerMode, NetworkConfiguration, NodeIndex, ProtocolId, ConnectionFilter, ConnectionDirection, Severity};
pub use message::{generic as generic_message, RequestId, BftMessage, LocalizedBftMessage, ConsensusVote, SignedConsensusVote, SignedConsensusMessage, SignedConsensusProposal, Status as StatusMessage};
pub use error::Error;
pub use config::{Roles, ProtocolConfig};
+16 -16
View File
@@ -28,7 +28,7 @@ use client;
use client::light::fetcher::{Fetcher, FetchChecker, RemoteCallRequest};
use io::SyncIo;
use message;
use network_libp2p::{Severity, PeerId};
use network_libp2p::{Severity, NodeIndex};
use service;
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT};
@@ -38,16 +38,16 @@ const REQUEST_TIMEOUT: Duration = Duration::from_secs(15);
/// On-demand service API.
pub trait OnDemandService<Block: BlockT>: Send + Sync {
/// When new node is connected.
fn on_connect(&self, peer: PeerId, role: service::Roles);
fn on_connect(&self, peer: NodeIndex, role: service::Roles);
/// When node is disconnected.
fn on_disconnect(&self, peer: PeerId);
fn on_disconnect(&self, peer: NodeIndex);
/// Maintain peers requests.
fn maintain_peers(&self, io: &mut SyncIo);
/// When call response is received from remote node.
fn on_remote_call_response(&self, io: &mut SyncIo, peer: PeerId, response: message::RemoteCallResponse);
fn on_remote_call_response(&self, io: &mut SyncIo, peer: NodeIndex, response: message::RemoteCallResponse);
}
/// On-demand requests service. Dispatches requests to appropriate peers.
@@ -66,8 +66,8 @@ struct OnDemandCore<B: BlockT, E: service::ExecuteInContext<B>> {
service: Weak<E>,
next_request_id: u64,
pending_requests: VecDeque<Request<B>>,
active_peers: LinkedHashMap<PeerId, Request<B>>,
idle_peers: VecDeque<PeerId>,
active_peers: LinkedHashMap<NodeIndex, Request<B>>,
idle_peers: VecDeque<NodeIndex>,
}
struct Request<Block: BlockT> {
@@ -132,7 +132,7 @@ impl<B: BlockT, E> OnDemand<B, E> where
}
/// Try to accept response from given peer.
fn accept_response<F: FnOnce(Request<B>) -> Accept<B>>(&self, rtype: &str, io: &mut SyncIo, peer: PeerId, request_id: u64, try_accept: F) {
fn accept_response<F: FnOnce(Request<B>) -> Accept<B>>(&self, rtype: &str, io: &mut SyncIo, peer: NodeIndex, request_id: u64, try_accept: F) {
let mut core = self.core.lock();
let request = match core.remove(peer, request_id) {
Some(request) => request,
@@ -165,7 +165,7 @@ impl<B, E> OnDemandService<B> for OnDemand<B, E> where
E: service::ExecuteInContext<B>,
B::Header: HeaderT,
{
fn on_connect(&self, peer: PeerId, role: service::Roles) {
fn on_connect(&self, peer: NodeIndex, role: service::Roles) {
if !role.intersects(service::Roles::FULL | service::Roles::AUTHORITY) { // TODO: correct?
return;
}
@@ -175,7 +175,7 @@ impl<B, E> OnDemandService<B> for OnDemand<B, E> where
core.dispatch();
}
fn on_disconnect(&self, peer: PeerId) {
fn on_disconnect(&self, peer: NodeIndex) {
let mut core = self.core.lock();
core.remove_peer(peer);
core.dispatch();
@@ -189,7 +189,7 @@ impl<B, E> OnDemandService<B> for OnDemand<B, E> where
core.dispatch();
}
fn on_remote_call_response(&self, io: &mut SyncIo, peer: PeerId, response: message::RemoteCallResponse) {
fn on_remote_call_response(&self, io: &mut SyncIo, peer: NodeIndex, response: message::RemoteCallResponse) {
self.accept_response("call", io, peer, response.id, |request| match request.data {
RequestData::RemoteCall(request, sender) => match self.checker.check_execution_proof(&request, response.proof) {
Ok(response) => {
@@ -222,11 +222,11 @@ impl<B, E> OnDemandCore<B, E> where
E: service::ExecuteInContext<B>,
B::Header: HeaderT,
{
pub fn add_peer(&mut self, peer: PeerId) {
pub fn add_peer(&mut self, peer: NodeIndex) {
self.idle_peers.push_back(peer);
}
pub fn remove_peer(&mut self, peer: PeerId) {
pub fn remove_peer(&mut self, peer: NodeIndex) {
if let Some(request) = self.active_peers.remove(&peer) {
self.pending_requests.push_front(request);
return;
@@ -237,7 +237,7 @@ impl<B, E> OnDemandCore<B, E> where
}
}
pub fn maintain_peers(&mut self) -> Vec<PeerId> {
pub fn maintain_peers(&mut self) -> Vec<NodeIndex> {
let now = Instant::now();
let mut bad_peers = Vec::new();
loop {
@@ -263,7 +263,7 @@ impl<B, E> OnDemandCore<B, E> where
});
}
pub fn remove(&mut self, peer: PeerId, id: u64) -> Option<Request<B>> {
pub fn remove(&mut self, peer: NodeIndex, id: u64) -> Option<Request<B>> {
match self.active_peers.entry(peer) {
Entry::Occupied(entry) => match entry.get().id == id {
true => {
@@ -321,7 +321,7 @@ pub mod tests {
use client;
use client::light::fetcher::{Fetcher, FetchChecker, RemoteCallRequest};
use message;
use network_libp2p::PeerId;
use network_libp2p::NodeIndex;
use service::{Roles, ExecuteInContext};
use test::TestIo;
use super::{REQUEST_TIMEOUT, OnDemand, OnDemandService};
@@ -358,7 +358,7 @@ pub mod tests {
core.idle_peers.len() + core.active_peers.len()
}
fn receive_call_response(on_demand: &OnDemand<Block, DummyExecutor>, network: &mut TestIo, peer: PeerId, id: message::RequestId) {
fn receive_call_response(on_demand: &OnDemand<Block, DummyExecutor>, network: &mut TestIo, peer: NodeIndex, id: message::RequestId) {
on_demand.on_remote_call_response(network, peer, message::RemoteCallResponse {
id: id,
proof: vec![vec![2]],
+84 -84
View File
@@ -21,7 +21,7 @@ use std::time;
use parking_lot::RwLock;
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, Hash, HashFor, As};
use runtime_primitives::generic::BlockId;
use network_libp2p::{PeerId, Severity};
use network_libp2p::{NodeIndex, Severity};
use codec::{Encode, Decode};
use message::{self, Message};
@@ -55,7 +55,7 @@ pub struct Protocol<B: BlockT, S: Specialization<B>> {
specialization: RwLock<S>,
context_data: ContextData<B>,
// Connected peers pending Status message.
handshaking_peers: RwLock<HashMap<PeerId, time::Instant>>,
handshaking_peers: RwLock<HashMap<NodeIndex, time::Instant>>,
transaction_pool: Arc<TransactionPool<B>>,
}
/// Syncing status and statistics
@@ -110,13 +110,13 @@ pub trait Context<B: BlockT> {
fn client(&self) -> &::chain::Client<B>;
/// Point out that a peer has been malign or irresponsible or appeared lazy.
fn report_peer(&mut self, peer_id: PeerId, reason: Severity);
fn report_peer(&mut self, who: NodeIndex, reason: Severity);
/// Get peer info.
fn peer_info(&self, peer: PeerId) -> Option<PeerInfo<B>>;
fn peer_info(&self, peer: NodeIndex) -> Option<PeerInfo<B>>;
/// Send a message to a peer.
fn send_message(&mut self, peer_id: PeerId, data: ::message::Message<B>);
fn send_message(&mut self, who: NodeIndex, data: ::message::Message<B>);
}
/// Protocol context.
@@ -134,17 +134,17 @@ impl<'a, B: BlockT + 'a> ProtocolContext<'a, B> {
}
/// Send a message to a peer.
pub fn send_message(&mut self, peer_id: PeerId, message: Message<B>) {
send_message(&self.context_data.peers, self.io, peer_id, message)
pub fn send_message(&mut self, who: NodeIndex, message: Message<B>) {
send_message(&self.context_data.peers, self.io, who, message)
}
/// Point out that a peer has been malign or irresponsible or appeared lazy.
pub fn report_peer(&mut self, peer_id: PeerId, reason: Severity) {
self.io.report_peer(peer_id, reason);
pub fn report_peer(&mut self, who: NodeIndex, reason: Severity) {
self.io.report_peer(who, reason);
}
/// Get peer info.
pub fn peer_info(&self, peer: PeerId) -> Option<PeerInfo<B>> {
pub fn peer_info(&self, peer: NodeIndex) -> Option<PeerInfo<B>> {
self.context_data.peers.read().get(&peer).map(|p| {
PeerInfo {
roles: p.roles,
@@ -157,16 +157,16 @@ impl<'a, B: BlockT + 'a> ProtocolContext<'a, B> {
}
impl<'a, B: BlockT + 'a> Context<B> for ProtocolContext<'a, B> {
fn send_message(&mut self, peer_id: PeerId, message: Message<B>) {
ProtocolContext::send_message(self, peer_id, message);
fn send_message(&mut self, who: NodeIndex, message: Message<B>) {
ProtocolContext::send_message(self, who, message);
}
fn report_peer(&mut self, peer_id: PeerId, reason: Severity) {
ProtocolContext::report_peer(self, peer_id, reason);
fn report_peer(&mut self, who: NodeIndex, reason: Severity) {
ProtocolContext::report_peer(self, who, reason);
}
fn peer_info(&self, peer_id: PeerId) -> Option<PeerInfo<B>> {
ProtocolContext::peer_info(self, peer_id)
fn peer_info(&self, who: NodeIndex) -> Option<PeerInfo<B>> {
ProtocolContext::peer_info(self, who)
}
fn client(&self) -> &Client<B> {
@@ -177,7 +177,7 @@ impl<'a, B: BlockT + 'a> Context<B> for ProtocolContext<'a, B> {
/// Data necessary to create a context.
pub(crate) struct ContextData<B: BlockT> {
// All connected peers
peers: RwLock<HashMap<PeerId, Peer<B>>>,
peers: RwLock<HashMap<NodeIndex, Peer<B>>>,
chain: Arc<Client<B>>,
}
@@ -228,63 +228,63 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> {
}
}
pub fn handle_packet(&self, io: &mut SyncIo, peer_id: PeerId, mut data: &[u8]) {
pub fn handle_packet(&self, io: &mut SyncIo, who: NodeIndex, mut data: &[u8]) {
let message: Message<B> = match Decode::decode(&mut data) {
Some(m) => m,
None => {
trace!(target: "sync", "Invalid packet from {}", peer_id);
io.report_peer(peer_id, Severity::Bad("Peer sent us a packet with invalid format"));
trace!(target: "sync", "Invalid packet from {}", who);
io.report_peer(who, Severity::Bad("Peer sent us a packet with invalid format"));
return;
}
};
match message {
GenericMessage::Status(s) => self.on_status_message(io, peer_id, s),
GenericMessage::BlockRequest(r) => self.on_block_request(io, peer_id, r),
GenericMessage::Status(s) => self.on_status_message(io, who, s),
GenericMessage::BlockRequest(r) => self.on_block_request(io, who, r),
GenericMessage::BlockResponse(r) => {
let request = {
let mut peers = self.context_data.peers.write();
if let Some(ref mut peer) = peers.get_mut(&peer_id) {
if let Some(ref mut peer) = peers.get_mut(&who) {
peer.request_timestamp = None;
match mem::replace(&mut peer.block_request, None) {
Some(r) => r,
None => {
io.report_peer(peer_id, Severity::Bad("Unexpected response packet received from peer"));
io.report_peer(who, Severity::Bad("Unexpected response packet received from peer"));
return;
}
}
} else {
io.report_peer(peer_id, Severity::Bad("Unexpected packet received from peer"));
io.report_peer(who, Severity::Bad("Unexpected packet received from peer"));
return;
}
};
if request.id != r.id {
trace!(target: "sync", "Ignoring mismatched response packet from {} (expected {} got {})", peer_id, request.id, r.id);
trace!(target: "sync", "Ignoring mismatched response packet from {} (expected {} got {})", who, request.id, r.id);
return;
}
self.on_block_response(io, peer_id, request, r);
self.on_block_response(io, who, request, r);
},
GenericMessage::BlockAnnounce(announce) => self.on_block_announce(io, peer_id, announce),
GenericMessage::Transactions(m) => self.on_extrinsics(io, peer_id, m),
GenericMessage::RemoteCallRequest(request) => self.on_remote_call_request(io, peer_id, request),
GenericMessage::RemoteCallResponse(response) => self.on_remote_call_response(io, peer_id, response),
other => self.specialization.write().on_message(&mut ProtocolContext::new(&self.context_data, io), peer_id, other),
GenericMessage::BlockAnnounce(announce) => self.on_block_announce(io, who, announce),
GenericMessage::Transactions(m) => self.on_extrinsics(io, who, m),
GenericMessage::RemoteCallRequest(request) => self.on_remote_call_request(io, who, request),
GenericMessage::RemoteCallResponse(response) => self.on_remote_call_response(io, who, response),
other => self.specialization.write().on_message(&mut ProtocolContext::new(&self.context_data, io), who, other),
}
}
pub fn send_message(&self, io: &mut SyncIo, peer_id: PeerId, message: Message<B>) {
send_message::<B>(&self.context_data.peers, io, peer_id, message)
pub fn send_message(&self, io: &mut SyncIo, who: NodeIndex, message: Message<B>) {
send_message::<B>(&self.context_data.peers, io, who, message)
}
/// Called when a new peer is connected
pub fn on_peer_connected(&self, io: &mut SyncIo, peer_id: PeerId) {
trace!(target: "sync", "Connected {}: {}", peer_id, io.peer_info(peer_id));
self.handshaking_peers.write().insert(peer_id, time::Instant::now());
self.send_status(io, peer_id);
pub fn on_peer_connected(&self, io: &mut SyncIo, who: NodeIndex) {
trace!(target: "sync", "Connected {}: {}", who, io.peer_info(who));
self.handshaking_peers.write().insert(who, time::Instant::now());
self.send_status(io, who);
}
/// Called by peer when it is disconnecting
pub fn on_peer_disconnected(&self, io: &mut SyncIo, peer: PeerId) {
pub fn on_peer_disconnected(&self, io: &mut SyncIo, peer: NodeIndex) {
trace!(target: "sync", "Disconnecting {}: {}", peer, io.peer_info(peer));
// lock all the the peer lists so that add/remove peer events are in order
@@ -305,7 +305,7 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> {
}
}
fn on_block_request(&self, io: &mut SyncIo, peer: PeerId, request: message::BlockRequest<B>) {
fn on_block_request(&self, io: &mut SyncIo, peer: NodeIndex, request: message::BlockRequest<B>) {
trace!(target: "sync", "BlockRequest {} from {}: from {:?} to {:?} max {:?}", request.id, peer, request.from, request.to, request.max);
let mut blocks = Vec::new();
let mut id = match request.from {
@@ -351,7 +351,7 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> {
self.send_message(io, peer, GenericMessage::BlockResponse(response))
}
fn on_block_response(&self, io: &mut SyncIo, peer: PeerId, request: message::BlockRequest<B>, response: message::BlockResponse<B>) {
fn on_block_response(&self, io: &mut SyncIo, peer: NodeIndex, request: message::BlockRequest<B>, response: message::BlockResponse<B>) {
// TODO: validate response
trace!(target: "sync", "BlockResponse {} from {} with {} blocks", response.id, peer, response.blocks.len());
self.sync.write().on_block_data(&mut ProtocolContext::new(&self.context_data, io), peer, request, response);
@@ -369,12 +369,12 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> {
{
let peers = self.context_data.peers.read();
let handshaking_peers = self.handshaking_peers.read();
for (peer_id, timestamp) in peers.iter()
for (who, timestamp) in peers.iter()
.filter_map(|(id, peer)| peer.request_timestamp.as_ref().map(|r| (id, r)))
.chain(handshaking_peers.iter()) {
if (tick - *timestamp).as_secs() > REQUEST_TIMEOUT_SEC {
trace!(target: "sync", "Timeout {}", peer_id);
aborting.push(*peer_id);
trace!(target: "sync", "Timeout {}", who);
aborting.push(*who);
}
}
}
@@ -385,7 +385,7 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> {
}
}
pub fn peer_info(&self, peer: PeerId) -> Option<PeerInfo<B>> {
pub fn peer_info(&self, peer: NodeIndex) -> Option<PeerInfo<B>> {
self.context_data.peers.read().get(&peer).map(|p| {
PeerInfo {
roles: p.roles,
@@ -397,26 +397,26 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> {
}
/// Called by peer to report status
fn on_status_message(&self, io: &mut SyncIo, peer_id: PeerId, status: message::Status<B>) {
trace!(target: "sync", "New peer {} {:?}", peer_id, status);
fn on_status_message(&self, io: &mut SyncIo, who: NodeIndex, status: message::Status<B>) {
trace!(target: "sync", "New peer {} {:?}", who, status);
if io.is_expired() {
trace!(target: "sync", "Status packet from expired session {}:{}", peer_id, io.peer_info(peer_id));
trace!(target: "sync", "Status packet from expired session {}:{}", who, io.peer_info(who));
return;
}
{
let mut peers = self.context_data.peers.write();
let mut handshaking_peers = self.handshaking_peers.write();
if peers.contains_key(&peer_id) {
debug!(target: "sync", "Unexpected status packet from {}:{}", peer_id, io.peer_info(peer_id));
if peers.contains_key(&who) {
debug!(target: "sync", "Unexpected status packet from {}:{}", who, io.peer_info(who));
return;
}
if status.genesis_hash != self.genesis_hash {
io.report_peer(peer_id, Severity::Bad(&format!("Peer is on different chain (our genesis: {} theirs: {})", self.genesis_hash, status.genesis_hash)));
io.report_peer(who, Severity::Bad(&format!("Peer is on different chain (our genesis: {} theirs: {})", self.genesis_hash, status.genesis_hash)));
return;
}
if status.version != CURRENT_VERSION {
io.report_peer(peer_id, Severity::Bad(&format!("Peer using unsupported protocol version {}", status.version)));
io.report_peer(who, Severity::Bad(&format!("Peer using unsupported protocol version {}", status.version)));
return;
}
@@ -431,27 +431,27 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> {
known_blocks: HashSet::new(),
next_request_id: 0,
};
peers.insert(peer_id.clone(), peer);
handshaking_peers.remove(&peer_id);
debug!(target: "sync", "Connected {} {}", peer_id, io.peer_info(peer_id));
peers.insert(who.clone(), peer);
handshaking_peers.remove(&who);
debug!(target: "sync", "Connected {} {}", who, io.peer_info(who));
}
let mut context = ProtocolContext::new(&self.context_data, io);
self.sync.write().new_peer(&mut context, peer_id);
self.specialization.write().on_connect(&mut context, peer_id, status.clone());
self.on_demand.as_ref().map(|s| s.on_connect(peer_id, status.roles));
self.sync.write().new_peer(&mut context, who);
self.specialization.write().on_connect(&mut context, who, status.clone());
self.on_demand.as_ref().map(|s| s.on_connect(who, status.roles));
}
/// Called when peer sends us new extrinsics
fn on_extrinsics(&self, _io: &mut SyncIo, peer_id: PeerId, extrinsics: message::Transactions<B::Extrinsic>) {
fn on_extrinsics(&self, _io: &mut SyncIo, who: NodeIndex, extrinsics: message::Transactions<B::Extrinsic>) {
// Accept extrinsics only when fully synced
if self.sync.read().status().state != SyncState::Idle {
trace!(target: "sync", "{} Ignoring extrinsics while syncing", peer_id);
trace!(target: "sync", "{} Ignoring extrinsics while syncing", who);
return;
}
trace!(target: "sync", "Received {} extrinsics from {}", extrinsics.len(), peer_id);
trace!(target: "sync", "Received {} extrinsics from {}", extrinsics.len(), who);
let mut peers = self.context_data.peers.write();
if let Some(ref mut peer) = peers.get_mut(&peer_id) {
if let Some(ref mut peer) = peers.get_mut(&who) {
for t in extrinsics {
if let Some(hash) = self.transaction_pool.import(&t) {
peer.known_extrinsics.insert(hash);
@@ -473,7 +473,7 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> {
let mut propagated_to = HashMap::new();
let mut peers = self.context_data.peers.write();
for (peer_id, ref mut peer) in peers.iter_mut() {
for (who, ref mut peer) in peers.iter_mut() {
let (hashes, to_send): (Vec<_>, Vec<_>) = extrinsics
.iter()
.cloned()
@@ -481,7 +481,7 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> {
.unzip();
if !to_send.is_empty() {
let node_id = io.peer_session_info(*peer_id).map(|info| match info.id {
let node_id = io.peer_session_info(*who).map(|info| match info.id {
Some(id) => format!("{}@{:x}", info.remote_address, id),
None => info.remote_address.clone(),
});
@@ -491,15 +491,15 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> {
propagated_to.entry(hash).or_insert_with(Vec::new).push(id.clone());
}
}
trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), peer_id);
self.send_message(io, *peer_id, GenericMessage::Transactions(to_send));
trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who);
self.send_message(io, *who, GenericMessage::Transactions(to_send));
}
}
self.transaction_pool.on_broadcasted(propagated_to);
}
/// Send Status message
fn send_status(&self, io: &mut SyncIo, peer_id: PeerId) {
fn send_status(&self, io: &mut SyncIo, who: NodeIndex) {
if let Ok(info) = self.context_data.chain.info() {
let status = message::generic::Status {
version: CURRENT_VERSION,
@@ -509,7 +509,7 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> {
best_hash: info.chain.best_hash,
chain_status: self.specialization.read().status(),
};
self.send_message(io, peer_id, GenericMessage::Status(status))
self.send_message(io, who, GenericMessage::Status(status))
}
}
@@ -533,16 +533,16 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> {
self.abort();
}
pub fn on_block_announce(&self, io: &mut SyncIo, peer_id: PeerId, announce: message::BlockAnnounce<B::Header>) {
pub fn on_block_announce(&self, io: &mut SyncIo, who: NodeIndex, announce: message::BlockAnnounce<B::Header>) {
let header = announce.header;
let hash = header.hash();
{
let mut peers = self.context_data.peers.write();
if let Some(ref mut peer) = peers.get_mut(&peer_id) {
if let Some(ref mut peer) = peers.get_mut(&who) {
peer.known_blocks.insert(hash.clone());
}
}
self.sync.write().on_block_announce(&mut ProtocolContext::new(&self.context_data, io), peer_id, hash, &header);
self.sync.write().on_block_announce(&mut ProtocolContext::new(&self.context_data, io), who, hash, &header);
}
pub fn on_block_imported(&self, io: &mut SyncIo, hash: B::Hash, header: &B::Header) {
@@ -561,35 +561,35 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> {
// send out block announcements
let mut peers = self.context_data.peers.write();
for (peer_id, ref mut peer) in peers.iter_mut() {
for (who, ref mut peer) in peers.iter_mut() {
if peer.known_blocks.insert(hash.clone()) {
trace!(target: "sync", "Announcing block {:?} to {}", hash, peer_id);
self.send_message(io, *peer_id, GenericMessage::BlockAnnounce(message::BlockAnnounce {
trace!(target: "sync", "Announcing block {:?} to {}", hash, who);
self.send_message(io, *who, GenericMessage::BlockAnnounce(message::BlockAnnounce {
header: header.clone()
}));
}
}
}
fn on_remote_call_request(&self, io: &mut SyncIo, peer_id: PeerId, request: message::RemoteCallRequest<B::Hash>) {
trace!(target: "sync", "Remote call request {} from {} ({} at {})", request.id, peer_id, request.method, request.block);
fn on_remote_call_request(&self, io: &mut SyncIo, who: NodeIndex, request: message::RemoteCallRequest<B::Hash>) {
trace!(target: "sync", "Remote call request {} from {} ({} at {})", request.id, who, request.method, request.block);
let proof = match self.context_data.chain.execution_proof(&request.block, &request.method, &request.data) {
Ok((_, proof)) => proof,
Err(error) => {
trace!(target: "sync", "Remote call request {} from {} ({} at {}) failed with: {}",
request.id, peer_id, request.method, request.block, error);
request.id, who, request.method, request.block, error);
Default::default()
},
};
self.send_message(io, peer_id, GenericMessage::RemoteCallResponse(message::RemoteCallResponse {
self.send_message(io, who, GenericMessage::RemoteCallResponse(message::RemoteCallResponse {
id: request.id, proof,
}));
}
fn on_remote_call_response(&self, io: &mut SyncIo, peer_id: PeerId, response: message::RemoteCallResponse) {
trace!(target: "sync", "Remote call response {} from {}", response.id, peer_id);
self.on_demand.as_ref().map(|s| s.on_remote_call_response(io, peer_id, response));
fn on_remote_call_response(&self, io: &mut SyncIo, who: NodeIndex, response: message::RemoteCallResponse) {
trace!(target: "sync", "Remote call response {} from {}", response.id, who);
self.on_demand.as_ref().map(|s| s.on_remote_call_response(io, who, response));
}
/// Execute a closure with access to a network context and specialization.
@@ -600,11 +600,11 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> {
}
}
fn send_message<B: BlockT>(peers: &RwLock<HashMap<PeerId, Peer<B>>>, io: &mut SyncIo, peer_id: PeerId, mut message: Message<B>) {
fn send_message<B: BlockT>(peers: &RwLock<HashMap<NodeIndex, Peer<B>>>, io: &mut SyncIo, who: NodeIndex, mut message: Message<B>) {
match &mut message {
&mut GenericMessage::BlockRequest(ref mut r) => {
let mut peers = peers.write();
if let Some(ref mut peer) = peers.get_mut(&peer_id) {
if let Some(ref mut peer) = peers.get_mut(&who) {
r.id = peer.next_request_id;
peer.next_request_id = peer.next_request_id + 1;
peer.block_request = Some(r.clone());
@@ -613,7 +613,7 @@ fn send_message<B: BlockT>(peers: &RwLock<HashMap<PeerId, Peer<B>>>, io: &mut Sy
},
_ => (),
}
io.send(peer_id, message.encode());
io.send(who, message.encode());
}
/// Hash a message.
+7 -7
View File
@@ -19,7 +19,7 @@ use std::sync::Arc;
use std::io;
use std::time::Duration;
use futures::sync::{oneshot, mpsc};
use network_libp2p::{NetworkProtocolHandler, NetworkContext, PeerId, ProtocolId,
use network_libp2p::{NetworkProtocolHandler, NetworkContext, NodeIndex, ProtocolId,
NetworkConfiguration , NonReservedPeerMode, ErrorKind};
use network_libp2p::{NetworkService};
use core_io::{TimerToken};
@@ -244,8 +244,8 @@ impl<B: BlockT + 'static, S: Specialization<B>> SyncProvider<B> for Service<B, S
self.network.with_context_eval(self.protocol_id, |ctx| {
let peer_ids = self.network.connected_peers();
peer_ids.into_iter().filter_map(|peer_id| {
let session_info = match ctx.session_info(peer_id) {
peer_ids.into_iter().filter_map(|who| {
let session_info = match ctx.session_info(who) {
None => return None,
Some(info) => info,
};
@@ -256,7 +256,7 @@ impl<B: BlockT + 'static, S: Specialization<B>> SyncProvider<B> for Service<B, S
capabilities: session_info.peer_capabilities.into_iter().map(|c| c.to_string()).collect(),
remote_address: session_info.remote_address,
local_address: session_info.local_address,
dot_info: self.handler.protocol.peer_info(peer_id),
dot_info: self.handler.protocol.peer_info(who),
})
}).collect()
}).unwrap_or_else(Vec::new)
@@ -276,15 +276,15 @@ impl<B: BlockT + 'static, S: Specialization<B>> NetworkProtocolHandler for Proto
.expect("Error registering transaction propagation timer");
}
fn read(&self, io: &NetworkContext, peer: &PeerId, _packet_id: u8, data: &[u8]) {
fn read(&self, io: &NetworkContext, peer: &NodeIndex, _packet_id: u8, data: &[u8]) {
self.protocol.handle_packet(&mut NetSyncIo::new(io), *peer, data);
}
fn connected(&self, io: &NetworkContext, peer: &PeerId) {
fn connected(&self, io: &NetworkContext, peer: &NodeIndex) {
self.protocol.on_peer_connected(&mut NetSyncIo::new(io), *peer);
}
fn disconnected(&self, io: &NetworkContext, peer: &PeerId) {
fn disconnected(&self, io: &NetworkContext, peer: &NodeIndex) {
self.protocol.on_peer_disconnected(&mut NetSyncIo::new(io), *peer);
}
@@ -16,7 +16,7 @@
//! Specializations of the substrate network protocol to allow more complex forms of communication.
use ::PeerId;
use ::NodeIndex;
use runtime_primitives::traits::Block as BlockT;
use protocol::Context;
@@ -29,13 +29,13 @@ pub trait Specialization<B: BlockT>: Send + Sync + 'static {
fn on_start(&mut self) { }
/// Called when a peer successfully handshakes.
fn on_connect(&mut self, ctx: &mut Context<B>, peer_id: PeerId, status: ::message::Status<B>);
fn on_connect(&mut self, ctx: &mut Context<B>, who: NodeIndex, status: ::message::Status<B>);
/// Called when a peer is disconnected. If the peer ID is unknown, it should be ignored.
fn on_disconnect(&mut self, ctx: &mut Context<B>, peer_id: PeerId);
fn on_disconnect(&mut self, ctx: &mut Context<B>, who: NodeIndex);
/// Called when a network-specific message arrives.
fn on_message(&mut self, ctx: &mut Context<B>, peer_id: PeerId, message: ::message::Message<B>);
fn on_message(&mut self, ctx: &mut Context<B>, who: NodeIndex, message: ::message::Message<B>);
/// Called on abort.
fn on_abort(&mut self) { }
+51 -51
View File
@@ -17,7 +17,7 @@
use std::collections::HashMap;
use std::sync::Arc;
use protocol::Context;
use network_libp2p::{Severity, PeerId};
use network_libp2p::{Severity, NodeIndex};
use client::{BlockStatus, BlockOrigin, ClientInfo};
use client::error::Error as ClientError;
use blocks::{self, BlockCollection};
@@ -51,7 +51,7 @@ enum PeerSyncState<B: BlockT> {
/// Relay chain sync strategy.
pub struct ChainSync<B: BlockT> {
genesis_hash: B::Hash,
peers: HashMap<PeerId, PeerSync<B>>,
peers: HashMap<NodeIndex, PeerSync<B>>,
blocks: BlockCollection<B>,
best_queued_number: NumberFor<B>,
best_queued_hash: B::Hash,
@@ -119,47 +119,47 @@ impl<B: BlockT> ChainSync<B> {
}
/// Handle new connected peer.
pub(crate) fn new_peer(&mut self, protocol: &mut Context<B>, peer_id: PeerId) {
if let Some(info) = protocol.peer_info(peer_id) {
pub(crate) fn new_peer(&mut self, protocol: &mut Context<B>, who: NodeIndex) {
if let Some(info) = protocol.peer_info(who) {
match (block_status(&*protocol.client(), &*self.import_queue, info.best_hash), info.best_number) {
(Err(e), _) => {
debug!(target:"sync", "Error reading blockchain: {:?}", e);
protocol.report_peer(peer_id, Severity::Useless(&format!("Error legimimately reading blockchain status: {:?}", e)));
protocol.report_peer(who, Severity::Useless(&format!("Error legimimately reading blockchain status: {:?}", e)));
},
(Ok(BlockStatus::KnownBad), _) => {
protocol.report_peer(peer_id, Severity::Bad(&format!("New peer with known bad best block {} ({}).", info.best_hash, info.best_number)));
protocol.report_peer(who, Severity::Bad(&format!("New peer with known bad best block {} ({}).", info.best_hash, info.best_number)));
},
(Ok(BlockStatus::Unknown), b) if b == As::sa(0) => {
protocol.report_peer(peer_id, Severity::Bad(&format!("New peer with unknown genesis hash {} ({}).", info.best_hash, info.best_number)));
protocol.report_peer(who, Severity::Bad(&format!("New peer with unknown genesis hash {} ({}).", info.best_hash, info.best_number)));
},
(Ok(BlockStatus::Unknown), _) => {
let our_best = self.best_queued_number;
if our_best > As::sa(0) {
debug!(target:"sync", "New peer with unknown best hash {} ({}), searching for common ancestor.", info.best_hash, info.best_number);
self.peers.insert(peer_id, PeerSync {
self.peers.insert(who, PeerSync {
common_hash: self.genesis_hash,
common_number: As::sa(0),
best_hash: info.best_hash,
best_number: info.best_number,
state: PeerSyncState::AncestorSearch(our_best),
});
Self::request_ancestry(protocol, peer_id, our_best)
Self::request_ancestry(protocol, who, our_best)
} else {
// We are at genesis, just start downloading
debug!(target:"sync", "New peer with best hash {} ({}).", info.best_hash, info.best_number);
self.peers.insert(peer_id, PeerSync {
self.peers.insert(who, PeerSync {
common_hash: self.genesis_hash,
common_number: As::sa(0),
best_hash: info.best_hash,
best_number: info.best_number,
state: PeerSyncState::Available,
});
self.download_new(protocol, peer_id)
self.download_new(protocol, who)
}
},
(Ok(BlockStatus::Queued), _) | (Ok(BlockStatus::InChain), _) => {
debug!(target:"sync", "New peer with known best hash {} ({}).", info.best_hash, info.best_number);
self.peers.insert(peer_id, PeerSync {
self.peers.insert(who, PeerSync {
common_hash: info.best_hash,
common_number: info.best_number,
best_hash: info.best_hash,
@@ -171,27 +171,27 @@ impl<B: BlockT> ChainSync<B> {
}
}
pub(crate) fn on_block_data(&mut self, protocol: &mut Context<B>, peer_id: PeerId, _request: message::BlockRequest<B>, response: message::BlockResponse<B>) {
let new_blocks = if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
pub(crate) fn on_block_data(&mut self, protocol: &mut Context<B>, who: NodeIndex, _request: message::BlockRequest<B>, response: message::BlockResponse<B>) {
let new_blocks = if let Some(ref mut peer) = self.peers.get_mut(&who) {
match peer.state {
PeerSyncState::DownloadingNew(start_block) => {
self.blocks.clear_peer_download(peer_id);
self.blocks.clear_peer_download(who);
peer.state = PeerSyncState::Available;
self.blocks.insert(start_block, response.blocks, peer_id);
self.blocks.insert(start_block, response.blocks, who);
self.blocks.drain(self.best_queued_number + As::sa(1))
},
PeerSyncState::DownloadingStale(_) => {
peer.state = PeerSyncState::Available;
response.blocks.into_iter().map(|b| blocks::BlockData {
origin: peer_id,
origin: who,
block: b
}).collect()
},
PeerSyncState::AncestorSearch(n) => {
match response.blocks.get(0) {
Some(ref block) => {
trace!(target: "sync", "Got ancestry block #{} ({}) from peer {}", n, block.hash, peer_id);
trace!(target: "sync", "Got ancestry block #{} ({}) from peer {}", n, block.hash, who);
match protocol.client().block_hash(n) {
Ok(Some(block_hash)) if block_hash == block.hash => {
if peer.common_number < n {
@@ -199,30 +199,30 @@ impl<B: BlockT> ChainSync<B> {
peer.common_number = n;
}
peer.state = PeerSyncState::Available;
trace!(target:"sync", "Found common ancestor for peer {}: {} ({})", peer_id, block.hash, n);
trace!(target:"sync", "Found common ancestor for peer {}: {} ({})", who, block.hash, n);
vec![]
},
Ok(our_best) if n > As::sa(0) => {
trace!(target:"sync", "Ancestry block mismatch for peer {}: theirs: {} ({}), ours: {:?}", peer_id, block.hash, n, our_best);
trace!(target:"sync", "Ancestry block mismatch for peer {}: theirs: {} ({}), ours: {:?}", who, block.hash, n, our_best);
let n = n - As::sa(1);
peer.state = PeerSyncState::AncestorSearch(n);
Self::request_ancestry(protocol, peer_id, n);
Self::request_ancestry(protocol, who, n);
return;
},
Ok(_) => { // genesis mismatch
trace!(target:"sync", "Ancestry search: genesis mismatch for peer {}", peer_id);
protocol.report_peer(peer_id, Severity::Bad("Ancestry search: genesis mismatch for peer"));
trace!(target:"sync", "Ancestry search: genesis mismatch for peer {}", who);
protocol.report_peer(who, Severity::Bad("Ancestry search: genesis mismatch for peer"));
return;
},
Err(e) => {
protocol.report_peer(peer_id, Severity::Useless(&format!("Error answering legitimate blockchain query: {:?}", e)));
protocol.report_peer(who, Severity::Useless(&format!("Error answering legitimate blockchain query: {:?}", e)));
return;
}
}
},
None => {
trace!(target:"sync", "Invalid response when searching for ancestor from {}", peer_id);
protocol.report_peer(peer_id, Severity::Bad("Invalid response when searching for ancestor"));
trace!(target:"sync", "Invalid response when searching for ancestor from {}", who);
protocol.report_peer(who, Severity::Bad("Invalid response when searching for ancestor"));
return;
}
}
@@ -241,7 +241,7 @@ impl<B: BlockT> ChainSync<B> {
}
pub fn maintain_sync(&mut self, protocol: &mut Context<B>) {
let peers: Vec<PeerId> = self.peers.keys().map(|p| *p).collect();
let peers: Vec<NodeIndex> = self.peers.keys().map(|p| *p).collect();
for peer in peers {
self.download_new(protocol, peer);
}
@@ -267,9 +267,9 @@ impl<B: BlockT> ChainSync<B> {
self.block_imported(&hash, best_header.number().clone())
}
pub(crate) fn on_block_announce(&mut self, protocol: &mut Context<B>, peer_id: PeerId, hash: B::Hash, header: &B::Header) {
pub(crate) fn on_block_announce(&mut self, protocol: &mut Context<B>, who: NodeIndex, hash: B::Hash, header: &B::Header) {
let number = *header.number();
if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
if let Some(ref mut peer) = self.peers.get_mut(&who) {
if number > peer.best_number {
peer.best_number = number;
peer.best_hash = hash;
@@ -285,17 +285,17 @@ impl<B: BlockT> ChainSync<B> {
let stale = number <= self.best_queued_number;
if stale {
if !self.is_known_or_already_downloading(protocol, header.parent_hash()) {
trace!(target: "sync", "Ignoring unknown stale block announce from {}: {} {:?}", peer_id, hash, header);
trace!(target: "sync", "Ignoring unknown stale block announce from {}: {} {:?}", who, hash, header);
} else {
trace!(target: "sync", "Downloading new stale block announced from {}: {} {:?}", peer_id, hash, header);
self.download_stale(protocol, peer_id, &hash);
trace!(target: "sync", "Downloading new stale block announced from {}: {} {:?}", who, hash, header);
self.download_stale(protocol, who, &hash);
}
} else {
trace!(target: "sync", "Downloading new block announced from {}: {} {:?}", peer_id, hash, header);
self.download_new(protocol, peer_id);
trace!(target: "sync", "Downloading new block announced from {}: {} {:?}", who, hash, header);
self.download_new(protocol, who);
}
} else {
trace!(target: "sync", "Known block announce from {}: {}", peer_id, hash);
trace!(target: "sync", "Known block announce from {}: {}", who, hash);
}
}
@@ -304,16 +304,16 @@ impl<B: BlockT> ChainSync<B> {
|| block_status(&*protocol.client(), &*self.import_queue, *hash).ok().map_or(false, |s| s != BlockStatus::Unknown)
}
pub(crate) fn peer_disconnected(&mut self, protocol: &mut Context<B>, peer_id: PeerId) {
self.blocks.clear_peer_download(peer_id);
self.peers.remove(&peer_id);
pub(crate) fn peer_disconnected(&mut self, protocol: &mut Context<B>, who: NodeIndex) {
self.blocks.clear_peer_download(who);
self.peers.remove(&who);
self.maintain_sync(protocol);
}
pub(crate) fn restart(&mut self, protocol: &mut Context<B>) {
self.import_queue.clear();
self.blocks.clear();
let ids: Vec<PeerId> = self.peers.keys().map(|p| *p).collect();
let ids: Vec<NodeIndex> = self.peers.keys().map(|p| *p).collect();
for id in ids {
self.new_peer(protocol, id);
}
@@ -336,8 +336,8 @@ impl<B: BlockT> ChainSync<B> {
}
// Download old block.
fn download_stale(&mut self, protocol: &mut Context<B>, peer_id: PeerId, hash: &B::Hash) {
if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
fn download_stale(&mut self, protocol: &mut Context<B>, who: NodeIndex, hash: &B::Hash) {
if let Some(ref mut peer) = self.peers.get_mut(&who) {
match peer.state {
PeerSyncState::Available => {
let request = message::generic::BlockRequest {
@@ -349,7 +349,7 @@ impl<B: BlockT> ChainSync<B> {
max: Some(1),
};
peer.state = PeerSyncState::DownloadingStale(*hash);
protocol.send_message(peer_id, GenericMessage::BlockRequest(request));
protocol.send_message(who, GenericMessage::BlockRequest(request));
},
_ => (),
}
@@ -357,8 +357,8 @@ impl<B: BlockT> ChainSync<B> {
}
// Issue a request for a peer to download new blocks, if any are available
fn download_new(&mut self, protocol: &mut Context<B>, peer_id: PeerId) {
if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
fn download_new(&mut self, protocol: &mut Context<B>, who: NodeIndex) {
if let Some(ref mut peer) = self.peers.get_mut(&who) {
let import_status = self.import_queue.status();
// when there are too many blocks in the queue => do not try to download new blocks
if import_status.importing_count > MAX_IMPORING_BLOCKS {
@@ -367,11 +367,11 @@ impl<B: BlockT> ChainSync<B> {
// we should not download already queued blocks
let common_number = ::std::cmp::max(peer.common_number, import_status.best_importing_number);
trace!(target: "sync", "Considering new block download from {}, common block is {}, best is {:?}", peer_id, common_number, peer.best_number);
trace!(target: "sync", "Considering new block download from {}, common block is {}, best is {:?}", who, common_number, peer.best_number);
match peer.state {
PeerSyncState::Available => {
if let Some(range) = self.blocks.needed_blocks(peer_id, MAX_BLOCKS_TO_REQUEST, peer.best_number, common_number) {
trace!(target: "sync", "Requesting blocks from {}, ({} to {})", peer_id, range.start, range.end);
if let Some(range) = self.blocks.needed_blocks(who, MAX_BLOCKS_TO_REQUEST, peer.best_number, common_number) {
trace!(target: "sync", "Requesting blocks from {}, ({} to {})", who, range.start, range.end);
let request = message::generic::BlockRequest {
id: 0,
fields: self.required_block_attributes.clone(),
@@ -381,7 +381,7 @@ impl<B: BlockT> ChainSync<B> {
max: Some((range.end - range.start).as_() as u32),
};
peer.state = PeerSyncState::DownloadingNew(range.start);
protocol.send_message(peer_id, GenericMessage::BlockRequest(request));
protocol.send_message(who, GenericMessage::BlockRequest(request));
} else {
trace!(target: "sync", "Nothing to request");
}
@@ -391,8 +391,8 @@ impl<B: BlockT> ChainSync<B> {
}
}
fn request_ancestry(protocol: &mut Context<B>, peer_id: PeerId, block: NumberFor<B>) {
trace!(target: "sync", "Requesting ancestry block #{} from {}", block, peer_id);
fn request_ancestry(protocol: &mut Context<B>, who: NodeIndex, block: NumberFor<B>) {
trace!(target: "sync", "Requesting ancestry block #{} from {}", block, who);
let request = message::generic::BlockRequest {
id: 0,
fields: message::BlockAttributes::HEADER | message::BlockAttributes::JUSTIFICATION,
@@ -401,7 +401,7 @@ impl<B: BlockT> ChainSync<B> {
direction: message::Direction::Ascending,
max: Some(1),
};
protocol.send_message(peer_id, GenericMessage::BlockRequest(request));
protocol.send_message(who, GenericMessage::BlockRequest(request));
}
}
+25 -25
View File
@@ -28,7 +28,7 @@ use io::SyncIo;
use protocol::{Context, Protocol};
use config::ProtocolConfig;
use service::TransactionPool;
use network_libp2p::{PeerId, SessionInfo, Severity};
use network_libp2p::{NodeIndex, SessionInfo, Severity};
use keyring::Keyring;
use codec::Encode;
use import_queue::tests::SyncImportQueue;
@@ -41,29 +41,29 @@ pub struct DummySpecialization;
impl Specialization<Block> for DummySpecialization {
fn status(&self) -> Vec<u8> { vec![] }
fn on_connect(&mut self, _ctx: &mut Context<Block>, _peer_id: PeerId, _status: ::message::Status<Block>) {
fn on_connect(&mut self, _ctx: &mut Context<Block>, _peer_id: NodeIndex, _status: ::message::Status<Block>) {
}
fn on_disconnect(&mut self, _ctx: &mut Context<Block>, _peer_id: PeerId) {
fn on_disconnect(&mut self, _ctx: &mut Context<Block>, _peer_id: NodeIndex) {
}
fn on_message(&mut self, _ctx: &mut Context<Block>, _peer_id: PeerId, _message: ::message::Message<Block>) {
fn on_message(&mut self, _ctx: &mut Context<Block>, _peer_id: NodeIndex, _message: ::message::Message<Block>) {
}
}
pub struct TestIo<'p> {
queue: &'p RwLock<VecDeque<TestPacket>>,
pub to_disconnect: HashSet<PeerId>,
pub to_disconnect: HashSet<NodeIndex>,
packets: Vec<TestPacket>,
peers_info: HashMap<PeerId, String>,
_sender: Option<PeerId>,
peers_info: HashMap<NodeIndex, String>,
_sender: Option<NodeIndex>,
}
impl<'p> TestIo<'p> where {
pub fn new(queue: &'p RwLock<VecDeque<TestPacket>>, sender: Option<PeerId>) -> TestIo<'p> {
pub fn new(queue: &'p RwLock<VecDeque<TestPacket>>, sender: Option<NodeIndex>) -> TestIo<'p> {
TestIo {
queue: queue,
_sender: sender,
@@ -81,28 +81,28 @@ impl<'p> Drop for TestIo<'p> {
}
impl<'p> SyncIo for TestIo<'p> {
fn report_peer(&mut self, peer_id: PeerId, _reason: Severity) {
self.to_disconnect.insert(peer_id);
fn report_peer(&mut self, who: NodeIndex, _reason: Severity) {
self.to_disconnect.insert(who);
}
fn is_expired(&self) -> bool {
false
}
fn send(&mut self, peer_id: PeerId, data: Vec<u8>) {
fn send(&mut self, who: NodeIndex, data: Vec<u8>) {
self.packets.push(TestPacket {
data: data,
recipient: peer_id,
recipient: who,
});
}
fn peer_info(&self, peer_id: PeerId) -> String {
self.peers_info.get(&peer_id)
fn peer_info(&self, who: NodeIndex) -> String {
self.peers_info.get(&who)
.cloned()
.unwrap_or_else(|| peer_id.to_string())
.unwrap_or_else(|| who.to_string())
}
fn peer_session_info(&self, _peer_id: PeerId) -> Option<SessionInfo> {
fn peer_session_info(&self, _peer_id: NodeIndex) -> Option<SessionInfo> {
None
}
}
@@ -110,7 +110,7 @@ impl<'p> SyncIo for TestIo<'p> {
/// Mocked subprotocol packet
pub struct TestPacket {
data: Vec<u8>,
recipient: PeerId,
recipient: NodeIndex,
}
pub struct Peer {
@@ -129,18 +129,18 @@ impl Peer {
}
/// Called on connection to other indicated peer.
fn on_connect(&self, other: PeerId) {
fn on_connect(&self, other: NodeIndex) {
self.sync.on_peer_connected(&mut TestIo::new(&self.queue, Some(other)), other);
}
/// Called on disconnect from other indicated peer.
fn on_disconnect(&self, other: PeerId) {
fn on_disconnect(&self, other: NodeIndex) {
let mut io = TestIo::new(&self.queue, Some(other));
self.sync.on_peer_disconnected(&mut io, other);
}
/// Receive a message from another peer. Return a set of peers to disconnect.
fn receive_message(&self, from: PeerId, msg: TestPacket) -> HashSet<PeerId> {
fn receive_message(&self, from: NodeIndex, msg: TestPacket) -> HashSet<NodeIndex> {
let mut io = TestIo::new(&self.queue, Some(from));
self.sync.handle_packet(&mut io, from, &msg.data);
self.flush();
@@ -219,7 +219,7 @@ impl TransactionPool<Block> for EmptyTransactionPool {
pub struct TestNet {
peers: Vec<Arc<Peer>>,
started: bool,
disconnect_events: Vec<(PeerId, PeerId)>, //disconnected (initiated by, to)
disconnect_events: Vec<(NodeIndex, NodeIndex)>, //disconnected (initiated by, to)
}
impl TestNet {
@@ -264,7 +264,7 @@ impl TestNet {
self.peers[peer].start();
for client in 0..self.peers.len() {
if peer != client {
self.peers[peer].on_connect(client as PeerId);
self.peers[peer].on_connect(client as NodeIndex);
}
}
}
@@ -278,17 +278,17 @@ impl TestNet {
let disconnecting = {
let recipient = packet.recipient;
trace!("--- {} -> {} ---", peer, recipient);
let to_disconnect = self.peers[recipient].receive_message(peer as PeerId, packet);
let to_disconnect = self.peers[recipient].receive_message(peer as NodeIndex, packet);
for d in &to_disconnect {
// notify this that disconnecting peers are disconnecting
self.peers[recipient].on_disconnect(*d as PeerId);
self.peers[recipient].on_disconnect(*d as NodeIndex);
self.disconnect_events.push((peer, *d));
}
to_disconnect
};
for d in &disconnecting {
// notify other peers that this peer is disconnecting
self.peers[*d].on_disconnect(peer as PeerId);
self.peers[*d].on_disconnect(peer as NodeIndex);
}
}