mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-27 22:07:58 +00:00
Add connected peers to protocol, use in sync provider (#1857)
* add connected peers to protocol, use in sync provider * use PeerId::random * address comments` * docs * fix import of PeerId * rewrite rpc tests using PeerId::random * whitespace * nits * remove option around peer id and remove field * further removal of the option around peer id * fix rpc tests
This commit is contained in:
committed by
Gav Wood
parent
ea7da0d4a4
commit
bc15fa31ff
@@ -15,7 +15,7 @@
|
||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use crossbeam_channel::{self as channel, Receiver, Sender, select};
|
||||
use network_libp2p::{NodeIndex, Severity};
|
||||
use network_libp2p::{NodeIndex, PeerId, Severity};
|
||||
use primitives::storage::StorageKey;
|
||||
use runtime_primitives::generic::BlockId;
|
||||
use runtime_primitives::traits::{As, Block as BlockT, Header as HeaderT, NumberFor, Zero};
|
||||
@@ -28,6 +28,7 @@ use crate::specialization::NetworkSpecialization;
|
||||
use crate::sync::{ChainSync, Status as SyncStatus, SyncState};
|
||||
use crate::service::{NetworkChan, NetworkMsg, TransactionPool, ExHashT};
|
||||
use crate::config::{ProtocolConfig, Roles};
|
||||
use parking_lot::RwLock;
|
||||
use rustc_hex::ToHex;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::sync::Arc;
|
||||
@@ -67,10 +68,26 @@ pub struct Protocol<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> {
|
||||
consensus_gossip: ConsensusGossip<B>,
|
||||
context_data: ContextData<B, H>,
|
||||
// Connected peers pending Status message.
|
||||
handshaking_peers: HashMap<NodeIndex, time::Instant>,
|
||||
handshaking_peers: HashMap<NodeIndex, HandshakingPeer>,
|
||||
// Connected peers from whom we received a Status message,
|
||||
// similar to context_data.peers but shared with the SyncProvider.
|
||||
connected_peers: Arc<RwLock<HashMap<NodeIndex, ConnectedPeer<B>>>>,
|
||||
transaction_pool: Arc<TransactionPool<H, B>>,
|
||||
}
|
||||
|
||||
/// A peer from whom we have received a Status message.
|
||||
#[derive(Clone)]
|
||||
pub struct ConnectedPeer<B: BlockT> {
|
||||
pub peer_info: PeerInfo<B>
|
||||
}
|
||||
|
||||
/// A peer that we are connected to
|
||||
/// and from whom we have not yet received a Status message.
|
||||
struct HandshakingPeer {
|
||||
timestamp: time::Instant,
|
||||
peer_id: PeerId,
|
||||
}
|
||||
|
||||
/// Syncing status and statistics
|
||||
#[derive(Clone)]
|
||||
pub struct ProtocolStatus<B: BlockT> {
|
||||
@@ -101,6 +118,8 @@ struct Peer<B: BlockT, H: ExHashT> {
|
||||
/// Info about a peer's known state.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct PeerInfo<B: BlockT> {
|
||||
/// Network id.
|
||||
pub peer_id: PeerId,
|
||||
/// Roles
|
||||
pub roles: Roles,
|
||||
/// Protocol version
|
||||
@@ -213,8 +232,6 @@ pub enum ProtocolMsg<B: BlockT, S: NetworkSpecialization<B>> {
|
||||
ExecuteWithGossip(Box<GossipTask<B> + Send + 'static>),
|
||||
/// Incoming gossip consensus message.
|
||||
GossipConsensusMessage(B::Hash, ConsensusEngineId, Vec<u8>),
|
||||
/// Return a list of peers currently known to protocol.
|
||||
Peers(Sender<Vec<(NodeIndex, PeerInfo<B>)>>),
|
||||
/// Tell protocol to abort sync (does not stop protocol).
|
||||
/// Only used in tests.
|
||||
#[cfg(any(test, feature = "test-helpers"))]
|
||||
@@ -228,7 +245,7 @@ pub enum ProtocolMsg<B: BlockT, S: NetworkSpecialization<B>> {
|
||||
/// Messages sent to Protocol from Network-libp2p.
|
||||
pub enum FromNetworkMsg<B: BlockT> {
|
||||
/// A peer connected, with debug info.
|
||||
PeerConnected(NodeIndex, String),
|
||||
PeerConnected(PeerId, NodeIndex, String),
|
||||
/// A peer disconnected, with debug info.
|
||||
PeerDisconnected(NodeIndex, String),
|
||||
/// A custom message from another peer.
|
||||
@@ -247,6 +264,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
pub fn new(
|
||||
is_offline: Arc<AtomicBool>,
|
||||
is_major_syncing: Arc<AtomicBool>,
|
||||
connected_peers: Arc<RwLock<HashMap<NodeIndex, ConnectedPeer<B>>>>,
|
||||
network_chan: NetworkChan<B>,
|
||||
config: ProtocolConfig,
|
||||
chain: Arc<Client<B>>,
|
||||
@@ -277,6 +295,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
specialization: specialization,
|
||||
consensus_gossip: ConsensusGossip::new(),
|
||||
handshaking_peers: HashMap::new(),
|
||||
connected_peers,
|
||||
transaction_pool: transaction_pool,
|
||||
};
|
||||
let tick_timeout = channel::tick(TICK_TIMEOUT);
|
||||
@@ -332,10 +351,6 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
|
||||
fn handle_client_msg(&mut self, msg: ProtocolMsg<B, S>) -> bool {
|
||||
match msg {
|
||||
ProtocolMsg::Peers(sender) => {
|
||||
let peers = self.context_data.peers.iter().map(|(idx, p)| (*idx, p.info.clone())).collect();
|
||||
let _ = sender.send(peers);
|
||||
},
|
||||
ProtocolMsg::Status(sender) => self.status(sender),
|
||||
ProtocolMsg::BlockImported(hash, header) => self.on_block_imported(hash, &header),
|
||||
ProtocolMsg::BlockFinalized(hash, header) => self.on_block_finalized(hash, &header),
|
||||
@@ -385,7 +400,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
fn handle_network_msg(&mut self, msg: FromNetworkMsg<B>) -> bool {
|
||||
match msg {
|
||||
FromNetworkMsg::PeerDisconnected(who, debug_info) => self.on_peer_disconnected(who, debug_info),
|
||||
FromNetworkMsg::PeerConnected(who, debug_info) => self.on_peer_connected(who, debug_info),
|
||||
FromNetworkMsg::PeerConnected(peer_id, who, debug_info) => self.on_peer_connected(peer_id, who, debug_info),
|
||||
FromNetworkMsg::PeerClogged(who, message) => self.on_clogged_peer(who, message),
|
||||
FromNetworkMsg::CustomMessage(who, message) => {
|
||||
self.on_custom_message(who, message)
|
||||
@@ -480,9 +495,9 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
}
|
||||
|
||||
/// Called when a new peer is connected
|
||||
fn on_peer_connected(&mut self, who: NodeIndex, debug_info: String) {
|
||||
fn on_peer_connected(&mut self, peer_id: PeerId, who: NodeIndex, debug_info: String) {
|
||||
trace!(target: "sync", "Connecting {}: {}", who, debug_info);
|
||||
self.handshaking_peers.insert(who, time::Instant::now());
|
||||
self.handshaking_peers.insert(who, HandshakingPeer { timestamp: time::Instant::now(), peer_id });
|
||||
self.send_status(who);
|
||||
}
|
||||
|
||||
@@ -492,6 +507,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
// lock all the the peer lists so that add/remove peer events are in order
|
||||
let removed = {
|
||||
self.handshaking_peers.remove(&peer);
|
||||
self.connected_peers.write().remove(&peer);
|
||||
self.context_data.peers.remove(&peer).is_some()
|
||||
};
|
||||
if removed {
|
||||
@@ -644,7 +660,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
aborting.push(*who);
|
||||
}
|
||||
}
|
||||
for (who, _) in self.handshaking_peers.iter().filter(|(_, t)| (tick - **t).as_secs() > REQUEST_TIMEOUT_SEC) {
|
||||
for (who, _) in self.handshaking_peers.iter().filter(|(_, handshaking)| (tick - handshaking.timestamp).as_secs() > REQUEST_TIMEOUT_SEC) {
|
||||
trace!(target: "sync", "Handshake timeout {}", who);
|
||||
aborting.push(*who);
|
||||
}
|
||||
@@ -711,13 +727,28 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
|
||||
let cache_limit = NonZeroUsize::new(1_000_000).expect("1_000_000 > 0; qed");
|
||||
|
||||
let peer = Peer {
|
||||
info: PeerInfo {
|
||||
protocol_version: status.version,
|
||||
roles: status.roles,
|
||||
best_hash: status.best_hash,
|
||||
best_number: status.best_number
|
||||
let info = match self.handshaking_peers.remove(&who) {
|
||||
Some(handshaking) => {
|
||||
let peer_info = PeerInfo {
|
||||
peer_id: handshaking.peer_id,
|
||||
protocol_version: status.version,
|
||||
roles: status.roles,
|
||||
best_hash: status.best_hash,
|
||||
best_number: status.best_number
|
||||
};
|
||||
self.connected_peers
|
||||
.write()
|
||||
.insert(who, ConnectedPeer { peer_info: peer_info.clone() });
|
||||
peer_info
|
||||
},
|
||||
None => {
|
||||
debug!(target: "sync", "Received status from previously unconnected node {}", who);
|
||||
return;
|
||||
},
|
||||
};
|
||||
|
||||
let peer = Peer {
|
||||
info,
|
||||
block_request: None,
|
||||
known_extrinsics: LruHashSet::new(cache_limit),
|
||||
known_blocks: LruHashSet::new(cache_limit),
|
||||
@@ -725,7 +756,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
obsolete_requests: HashMap::new(),
|
||||
};
|
||||
self.context_data.peers.insert(who.clone(), peer);
|
||||
self.handshaking_peers.remove(&who);
|
||||
|
||||
debug!(target: "sync", "Connected {}", who);
|
||||
}
|
||||
|
||||
@@ -769,7 +800,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
|
||||
let extrinsics = self.transaction_pool.transactions();
|
||||
let mut propagated_to = HashMap::new();
|
||||
for (who, ref mut peer) in self.context_data.peers.iter_mut() {
|
||||
for (who, peer) in self.context_data.peers.iter_mut() {
|
||||
let (hashes, to_send): (Vec<_>, Vec<_>) = extrinsics
|
||||
.iter()
|
||||
.filter(|&(ref hash, _)| peer.known_extrinsics.insert(hash.clone()))
|
||||
@@ -777,18 +808,11 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
.unzip();
|
||||
|
||||
if !to_send.is_empty() {
|
||||
let (sender, port) = channel::unbounded();
|
||||
let _ = self
|
||||
.network_chan
|
||||
.send(NetworkMsg::GetPeerId(who.clone(), sender));
|
||||
let node_id = port.recv().expect("1. We are running 2. Network should be running too.");
|
||||
if let Some(id) = node_id {
|
||||
for hash in hashes {
|
||||
propagated_to
|
||||
.entry(hash)
|
||||
.or_insert_with(Vec::new)
|
||||
.push(id.clone());
|
||||
}
|
||||
for hash in hashes {
|
||||
propagated_to
|
||||
.entry(hash)
|
||||
.or_insert_with(Vec::new)
|
||||
.push(peer.info.peer_id.to_base58());
|
||||
}
|
||||
trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who);
|
||||
self.network_chan.send(NetworkMsg::Outgoing(*who, GenericMessage::Transactions(to_send)))
|
||||
|
||||
Reference in New Issue
Block a user