mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-27 22:07:58 +00:00
Small refactoring. (#1826)
This commit is contained in:
committed by
Robert Habermeier
parent
8065116ba5
commit
c5d3da32f2
@@ -82,14 +82,7 @@ pub struct ProtocolStatus<B: BlockT> {
|
||||
/// Peer information
|
||||
#[derive(Debug)]
|
||||
struct Peer<B: BlockT, H: ExHashT> {
|
||||
/// Protocol version
|
||||
protocol_version: u32,
|
||||
/// Roles
|
||||
roles: Roles,
|
||||
/// Peer best block hash
|
||||
best_hash: B::Hash,
|
||||
/// Peer best block number
|
||||
best_number: <B::Header as HeaderT>::Number,
|
||||
info: PeerInfo<B>,
|
||||
/// Current block request, if any.
|
||||
block_request: Option<(time::Instant, message::BlockRequest<B>)>,
|
||||
/// Requests we are no longer insterested in.
|
||||
@@ -131,61 +124,28 @@ pub trait Context<B: BlockT> {
|
||||
}
|
||||
|
||||
/// Protocol context.
|
||||
pub(crate) struct ProtocolContext<'a, B: 'a + BlockT, H: 'a + ExHashT> {
|
||||
struct ProtocolContext<'a, B: 'a + BlockT, H: 'a + ExHashT> {
|
||||
network_chan: &'a NetworkChan<B>,
|
||||
context_data: &'a mut ContextData<B, H>,
|
||||
}
|
||||
|
||||
impl<'a, B: BlockT + 'a, H: 'a + ExHashT> ProtocolContext<'a, B, H> {
|
||||
pub(crate) fn new(
|
||||
context_data: &'a mut ContextData<B, H>,
|
||||
network_chan: &'a NetworkChan<B>,
|
||||
) -> Self {
|
||||
ProtocolContext {
|
||||
network_chan,
|
||||
context_data,
|
||||
}
|
||||
}
|
||||
|
||||
/// Send a message to a peer.
|
||||
pub fn send_message(&mut self, who: NodeIndex, message: Message<B>) {
|
||||
send_message(
|
||||
&mut self.context_data.peers,
|
||||
&self.network_chan,
|
||||
who,
|
||||
message,
|
||||
)
|
||||
}
|
||||
|
||||
/// Point out that a peer has been malign or irresponsible or appeared lazy.
|
||||
pub fn report_peer(&mut self, who: NodeIndex, reason: Severity) {
|
||||
let _ = self
|
||||
.network_chan
|
||||
.send(NetworkMsg::ReportPeer(who, reason));
|
||||
}
|
||||
|
||||
/// Get peer info.
|
||||
pub fn peer_info(&self, peer: NodeIndex) -> Option<PeerInfo<B>> {
|
||||
self.context_data.peers.get(&peer).map(|p| PeerInfo {
|
||||
roles: p.roles,
|
||||
protocol_version: p.protocol_version,
|
||||
best_hash: p.best_hash,
|
||||
best_number: p.best_number,
|
||||
})
|
||||
fn new(context_data: &'a mut ContextData<B, H>, network_chan: &'a NetworkChan<B>) -> Self {
|
||||
ProtocolContext { network_chan, context_data }
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context<B> for ProtocolContext<'a, B, H> {
|
||||
fn send_message(&mut self, who: NodeIndex, message: Message<B>) {
|
||||
ProtocolContext::send_message(self, who, message);
|
||||
send_message(&mut self.context_data.peers, &self.network_chan, who, message)
|
||||
}
|
||||
|
||||
fn report_peer(&mut self, who: NodeIndex, reason: Severity) {
|
||||
ProtocolContext::report_peer(self, who, reason);
|
||||
self.network_chan.send(NetworkMsg::ReportPeer(who, reason))
|
||||
}
|
||||
|
||||
fn peer_info(&self, who: NodeIndex) -> Option<PeerInfo<B>> {
|
||||
ProtocolContext::peer_info(self, who)
|
||||
self.context_data.peers.get(&who).map(|p| p.info.clone())
|
||||
}
|
||||
|
||||
fn client(&self) -> &Client<B> {
|
||||
@@ -194,7 +154,7 @@ impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context<B> for ProtocolContext<'a, B,
|
||||
}
|
||||
|
||||
/// Data necessary to create a context.
|
||||
pub(crate) struct ContextData<B: BlockT, H: ExHashT> {
|
||||
struct ContextData<B: BlockT, H: ExHashT> {
|
||||
// All connected peers
|
||||
peers: HashMap<NodeIndex, Peer<B, H>>,
|
||||
pub chain: Arc<Client<B>>,
|
||||
@@ -345,17 +305,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
fn handle_msg(&mut self, msg: ProtocolMsg<B, S>) -> bool {
|
||||
match msg {
|
||||
ProtocolMsg::Peers(sender) => {
|
||||
let peers = self.context_data.peers.iter().map(|(idx, p)| {
|
||||
(
|
||||
*idx,
|
||||
PeerInfo {
|
||||
roles: p.roles,
|
||||
protocol_version: p.protocol_version,
|
||||
best_hash: p.best_hash,
|
||||
best_number: p.best_number,
|
||||
}
|
||||
)
|
||||
}).collect();
|
||||
let peers = self.context_data.peers.iter().map(|(idx, p)| (*idx, p.info.clone())).collect();
|
||||
let _ = sender.send(peers);
|
||||
},
|
||||
ProtocolMsg::PeerDisconnected(who, debug_info) => self.on_peer_disconnected(who, debug_info),
|
||||
@@ -430,9 +380,8 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
return request.map(|(_, r)| r)
|
||||
}
|
||||
trace!(target: "sync", "Unexpected response packet from {} ({})", who, response.id,);
|
||||
let _ = self
|
||||
.network_chan
|
||||
.send(NetworkMsg::ReportPeer(who, Severity::Bad("Unexpected response packet received from peer".to_string())));
|
||||
let severity = Severity::Bad("Unexpected response packet received from peer".to_string());
|
||||
self.network_chan.send(NetworkMsg::ReportPeer(who, severity))
|
||||
}
|
||||
None
|
||||
}
|
||||
@@ -532,17 +481,13 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
|
||||
/// Called as a back-pressure mechanism if the networking detects that the peer cannot process
|
||||
/// our messaging rate fast enough.
|
||||
pub fn on_clogged_peer(
|
||||
&self,
|
||||
who: NodeIndex,
|
||||
_message: Option<Message<B>>,
|
||||
) {
|
||||
pub fn on_clogged_peer(&self, who: NodeIndex, _msg: Option<Message<B>>) {
|
||||
// We don't do anything but print some diagnostics for now.
|
||||
if let Some(peer) = self.context_data.peers.get(&who) {
|
||||
debug!(target: "sync", "Clogged peer {} (protocol_version: {:?}; roles: {:?}; \
|
||||
known_extrinsics: {:?}; known_blocks: {:?}; best_hash: {:?}; best_number: {:?})",
|
||||
who, peer.protocol_version, peer.roles, peer.known_extrinsics, peer.known_blocks,
|
||||
peer.best_hash, peer.best_number);
|
||||
who, peer.info.protocol_version, peer.info.roles, peer.known_extrinsics, peer.known_blocks,
|
||||
peer.info.best_hash, peer.info.best_number);
|
||||
} else {
|
||||
debug!(target: "sync", "Peer clogged before being properly connected");
|
||||
}
|
||||
@@ -573,7 +518,11 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
let number = header.number().clone();
|
||||
let hash = header.hash();
|
||||
let parent_hash = header.parent_hash().clone();
|
||||
let justification = if get_justification { self.context_data.chain.justification(&BlockId::Hash(hash)).unwrap_or(None) } else { None };
|
||||
let justification = if get_justification {
|
||||
self.context_data.chain.justification(&BlockId::Hash(hash)).unwrap_or(None)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let block_data = message::generic::BlockData {
|
||||
hash: hash,
|
||||
header: if get_header { Some(header) } else { None },
|
||||
@@ -677,11 +626,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
}
|
||||
}
|
||||
|
||||
self.specialization
|
||||
.maintain_peers(&mut ProtocolContext::new(
|
||||
&mut self.context_data,
|
||||
&self.network_chan,
|
||||
));
|
||||
self.specialization.maintain_peers(&mut ProtocolContext::new(&mut self.context_data, &self.network_chan));
|
||||
for p in aborting {
|
||||
let _ = self
|
||||
.network_chan
|
||||
@@ -689,16 +634,6 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn peer_info(&mut self, peer: NodeIndex) -> Option<PeerInfo<B>> {
|
||||
self.context_data.peers.get(&peer).map(|p| PeerInfo {
|
||||
roles: p.roles,
|
||||
protocol_version: p.protocol_version,
|
||||
best_hash: p.best_hash,
|
||||
best_number: p.best_number,
|
||||
})
|
||||
}
|
||||
|
||||
/// Called by peer to report status
|
||||
fn on_status_message(&mut self, who: NodeIndex, status: message::Status<B>) {
|
||||
trace!(target: "sync", "New peer {} {:?}", who, status);
|
||||
@@ -751,10 +686,12 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
}
|
||||
|
||||
let peer = Peer {
|
||||
protocol_version: status.version,
|
||||
roles: status.roles,
|
||||
best_hash: status.best_hash,
|
||||
best_number: status.best_number,
|
||||
info: PeerInfo {
|
||||
protocol_version: status.version,
|
||||
roles: status.roles,
|
||||
best_hash: status.best_hash,
|
||||
best_number: status.best_number
|
||||
},
|
||||
block_request: None,
|
||||
known_extrinsics: HashSet::new(),
|
||||
known_blocks: HashSet::new(),
|
||||
@@ -805,8 +742,6 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
}
|
||||
|
||||
let extrinsics = self.transaction_pool.transactions();
|
||||
// FIXME: find a way to remove this vec. https://github.com/paritytech/substrate/issues/1698
|
||||
let mut will_send = vec![];
|
||||
let mut propagated_to = HashMap::new();
|
||||
for (who, ref mut peer) in self.context_data.peers.iter_mut() {
|
||||
let (hashes, to_send): (Vec<_>, Vec<_>) = extrinsics
|
||||
@@ -830,12 +765,9 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
}
|
||||
}
|
||||
trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who);
|
||||
will_send.push((who.clone(), to_send));
|
||||
self.network_chan.send(NetworkMsg::Outgoing(*who, GenericMessage::Transactions(to_send)))
|
||||
}
|
||||
}
|
||||
for (who, to_send) in will_send {
|
||||
self.send_message(who, GenericMessage::Transactions(to_send));
|
||||
}
|
||||
self.transaction_pool.on_broadcasted(propagated_to);
|
||||
}
|
||||
|
||||
@@ -857,17 +789,12 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
};
|
||||
let hash = header.hash();
|
||||
|
||||
// FIXME: find a way to remove this vec. https://github.com/paritytech/substrate/issues/1698
|
||||
let mut to_send = vec![];
|
||||
let message = GenericMessage::BlockAnnounce(message::BlockAnnounce { header: header.clone() });
|
||||
|
||||
for (who, ref mut peer) in self.context_data.peers.iter_mut() {
|
||||
trace!(target: "sync", "Reannouncing block {:?} to {}", hash, who);
|
||||
peer.known_blocks.insert(hash);
|
||||
to_send.push(who.clone());
|
||||
}
|
||||
for who in to_send {
|
||||
self.send_message(who, GenericMessage::BlockAnnounce(message::BlockAnnounce {
|
||||
header: header.clone()
|
||||
}));
|
||||
self.network_chan.send(NetworkMsg::Outgoing(*who, message.clone()))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -922,7 +849,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
}
|
||||
|
||||
fn on_block_imported(&mut self, hash: B::Hash, header: &B::Header) {
|
||||
self.sync.update_chain_info(&header);
|
||||
self.sync.update_chain_info(header);
|
||||
self.specialization.on_block_imported(
|
||||
&mut ProtocolContext::new(&mut self.context_data, &self.network_chan),
|
||||
hash.clone(),
|
||||
@@ -936,22 +863,14 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
|
||||
// send out block announcements
|
||||
|
||||
// FIXME: find a way to remove this vec. https://github.com/paritytech/substrate/issues/1698
|
||||
let mut to_send = vec![];
|
||||
let message = GenericMessage::BlockAnnounce(message::BlockAnnounce { header: header.clone() });
|
||||
|
||||
for (who, ref mut peer) in self.context_data.peers.iter_mut() {
|
||||
if peer.known_blocks.insert(hash.clone()) {
|
||||
trace!(target: "sync", "Announcing block {:?} to {}", hash, who);
|
||||
to_send.push(who.clone());
|
||||
self.network_chan.send(NetworkMsg::Outgoing(*who, message.clone()))
|
||||
}
|
||||
}
|
||||
for who in to_send {
|
||||
self.send_message(
|
||||
who,
|
||||
GenericMessage::BlockAnnounce(message::BlockAnnounce {
|
||||
header: header.clone(),
|
||||
}),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn on_block_finalized(&mut self, hash: B::Hash, header: &B::Header) {
|
||||
@@ -1111,19 +1030,16 @@ fn send_message<B: BlockT, H: ExHashT>(
|
||||
who: NodeIndex,
|
||||
mut message: Message<B>,
|
||||
) {
|
||||
match message {
|
||||
GenericMessage::BlockRequest(ref mut r) => {
|
||||
if let Some(ref mut peer) = peers.get_mut(&who) {
|
||||
r.id = peer.next_request_id;
|
||||
peer.next_request_id = peer.next_request_id + 1;
|
||||
if let Some((timestamp, request)) = peer.block_request.take() {
|
||||
trace!(target: "sync", "Request {} for {} is now obsolete.", request.id, who);
|
||||
peer.obsolete_requests.insert(request.id, timestamp);
|
||||
}
|
||||
peer.block_request = Some((time::Instant::now(), r.clone()));
|
||||
if let GenericMessage::BlockRequest(ref mut r) = message {
|
||||
if let Some(ref mut peer) = peers.get_mut(&who) {
|
||||
r.id = peer.next_request_id;
|
||||
peer.next_request_id = peer.next_request_id + 1;
|
||||
if let Some((timestamp, request)) = peer.block_request.take() {
|
||||
trace!(target: "sync", "Request {} for {} is now obsolete.", request.id, who);
|
||||
peer.obsolete_requests.insert(request.id, timestamp);
|
||||
}
|
||||
peer.block_request = Some((time::Instant::now(), r.clone()));
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
network_chan.send(NetworkMsg::Outgoing(who, message));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user