mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-11 22:21:07 +00:00
Fix NotificationStreamClosed reported when it shouldn't (#5160)
* Fix NotificationStreamClosed reported when it shouldn't * Fix test * Add test * Update client/network/src/protocol.rs Co-Authored-By: Bastian Köcher <bkchr@users.noreply.github.com> * Oops, fix test Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
This commit is contained in:
@@ -21,7 +21,7 @@
|
||||
|
||||
pub use crate::chain::{Client, FinalityProofProvider};
|
||||
pub use crate::on_demand_layer::OnDemand;
|
||||
pub use crate::service::TransactionPool;
|
||||
pub use crate::service::{TransactionPool, EmptyTransactionPool};
|
||||
pub use libp2p::{identity, core::PublicKey, wasm_ext::ExtTransport, build_multiaddr};
|
||||
|
||||
// Note: this re-export shouldn't be part of the public API of the crate and will be removed in
|
||||
|
||||
@@ -779,7 +779,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
|
||||
}
|
||||
|
||||
/// Called by peer when it is disconnecting
|
||||
pub fn on_peer_disconnected(&mut self, peer: PeerId) {
|
||||
pub fn on_peer_disconnected(&mut self, peer: PeerId) -> CustomMessageOutcome<B> {
|
||||
if self.important_peers.contains(&peer) {
|
||||
warn!(target: "sync", "Reserved peer {} disconnected", peer);
|
||||
} else {
|
||||
@@ -796,7 +796,15 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
|
||||
self.light_dispatch.on_disconnect(LightDispatchIn {
|
||||
behaviour: &mut self.behaviour,
|
||||
peerset: self.peerset_handle.clone(),
|
||||
}, peer);
|
||||
}, &peer);
|
||||
|
||||
// Notify all the notification protocols as closed.
|
||||
CustomMessageOutcome::NotificationStreamClosed {
|
||||
remote: peer,
|
||||
protocols: self.protocol_name_by_engine.keys().cloned().collect(),
|
||||
}
|
||||
} else {
|
||||
CustomMessageOutcome::None
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2063,12 +2071,7 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
|
||||
CustomMessageOutcome::None
|
||||
}
|
||||
GenericProtoOut::CustomProtocolClosed { peer_id, .. } => {
|
||||
self.on_peer_disconnected(peer_id.clone());
|
||||
// Notify all the notification protocols as closed.
|
||||
CustomMessageOutcome::NotificationStreamClosed {
|
||||
remote: peer_id,
|
||||
protocols: self.protocol_name_by_engine.keys().cloned().collect(),
|
||||
}
|
||||
self.on_peer_disconnected(peer_id.clone())
|
||||
},
|
||||
GenericProtoOut::CustomMessage { peer_id, message } =>
|
||||
self.on_custom_message(peer_id, message),
|
||||
@@ -2139,3 +2142,50 @@ impl<B: BlockT, H: ExHashT> Drop for Protocol<B, H> {
|
||||
debug!(target: "sync", "Network stats:\n{}", self.format_stats());
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::PeerId;
|
||||
use crate::protocol::light_dispatch::AlwaysBadChecker;
|
||||
use crate::config::{EmptyTransactionPool, Roles};
|
||||
use super::{CustomMessageOutcome, Protocol, ProtocolConfig};
|
||||
|
||||
use sp_consensus::block_validation::DefaultBlockAnnounceValidator;
|
||||
use std::sync::Arc;
|
||||
use substrate_test_runtime_client::{TestClientBuilder, TestClientBuilderExt};
|
||||
use substrate_test_runtime_client::runtime::{Block, Hash};
|
||||
|
||||
#[test]
|
||||
fn no_handshake_no_notif_closed() {
|
||||
let client = Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0);
|
||||
|
||||
let (mut protocol, _) = Protocol::<Block, Hash>::new(
|
||||
ProtocolConfig {
|
||||
roles: Roles::FULL,
|
||||
max_parallel_downloads: 10,
|
||||
},
|
||||
client.clone(),
|
||||
Arc::new(AlwaysBadChecker),
|
||||
Arc::new(EmptyTransactionPool),
|
||||
None,
|
||||
None,
|
||||
From::from(&b"test"[..]),
|
||||
sc_peerset::PeersetConfig {
|
||||
in_peers: 10,
|
||||
out_peers: 10,
|
||||
bootnodes: Vec::new(),
|
||||
reserved_only: false,
|
||||
reserved_nodes: Vec::new(),
|
||||
},
|
||||
Box::new(DefaultBlockAnnounceValidator::new(client.clone())),
|
||||
None
|
||||
).unwrap();
|
||||
|
||||
let dummy_peer_id = PeerId::random();
|
||||
let _ = protocol.on_peer_connected(dummy_peer_id.clone());
|
||||
match protocol.on_peer_disconnected(dummy_peer_id) {
|
||||
CustomMessageOutcome::None => {},
|
||||
_ => panic!()
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -273,7 +273,7 @@ impl<B: BlockT> LightDispatch<B> where
|
||||
info!("Invalid remote {} response from peer {}", rtype, peer);
|
||||
network.report_peer(&peer, ReputationChange::new_fatal("Invalid remote response"));
|
||||
network.disconnect_peer(&peer);
|
||||
self.remove_peer(peer);
|
||||
self.remove_peer(&peer);
|
||||
return;
|
||||
},
|
||||
};
|
||||
@@ -285,7 +285,7 @@ impl<B: BlockT> LightDispatch<B> where
|
||||
info!("Failed to check remote {} response from peer {}: {}", rtype, peer, error);
|
||||
network.report_peer(&peer, ReputationChange::new_fatal("Failed remote response check"));
|
||||
network.disconnect_peer(&peer);
|
||||
self.remove_peer(peer);
|
||||
self.remove_peer(&peer);
|
||||
|
||||
if retry_count > 0 {
|
||||
(retry_count - 1, Some(retry_request_data))
|
||||
@@ -299,7 +299,7 @@ impl<B: BlockT> LightDispatch<B> where
|
||||
info!("Unexpected response to remote {} from peer", rtype);
|
||||
network.report_peer(&peer, ReputationChange::new_fatal("Unexpected remote response"));
|
||||
network.disconnect_peer(&peer);
|
||||
self.remove_peer(peer);
|
||||
self.remove_peer(&peer);
|
||||
|
||||
(retry_count, Some(retry_request_data))
|
||||
},
|
||||
@@ -337,7 +337,7 @@ impl<B: BlockT> LightDispatch<B> where
|
||||
}
|
||||
|
||||
/// Call this when we disconnect from a node.
|
||||
pub fn on_disconnect(&mut self, network: impl LightDispatchNetwork<B>, peer: PeerId) {
|
||||
pub fn on_disconnect(&mut self, network: impl LightDispatchNetwork<B>, peer: &PeerId) {
|
||||
self.remove_peer(peer);
|
||||
self.dispatch(network);
|
||||
}
|
||||
@@ -523,15 +523,15 @@ impl<B: BlockT> LightDispatch<B> where
|
||||
/// Removes a peer from the list of known peers.
|
||||
///
|
||||
/// Puts back the active request that this node was performing into `pending_requests`.
|
||||
fn remove_peer(&mut self, peer: PeerId) {
|
||||
self.best_blocks.remove(&peer);
|
||||
fn remove_peer(&mut self, peer: &PeerId) {
|
||||
self.best_blocks.remove(peer);
|
||||
|
||||
if let Some(request) = self.active_peers.remove(&peer) {
|
||||
if let Some(request) = self.active_peers.remove(peer) {
|
||||
self.pending_requests.push_front(request);
|
||||
return;
|
||||
}
|
||||
|
||||
if let Some(idle_index) = self.idle_peers.iter().position(|i| *i == peer) {
|
||||
if let Some(idle_index) = self.idle_peers.iter().position(|i| i == peer) {
|
||||
self.idle_peers.swap_remove_back(idle_index);
|
||||
}
|
||||
}
|
||||
@@ -860,7 +860,7 @@ pub mod tests {
|
||||
assert_eq!(1, total_peers(&light_dispatch));
|
||||
assert!(!light_dispatch.best_blocks.is_empty());
|
||||
|
||||
light_dispatch.on_disconnect(&mut network_interface, peer0);
|
||||
light_dispatch.on_disconnect(&mut network_interface, &peer0);
|
||||
assert_eq!(0, total_peers(&light_dispatch));
|
||||
assert!(light_dispatch.best_blocks.is_empty());
|
||||
}
|
||||
|
||||
@@ -80,6 +80,37 @@ pub trait TransactionPool<H: ExHashT, B: BlockT>: Send + Sync {
|
||||
fn transaction(&self, hash: &H) -> Option<B::Extrinsic>;
|
||||
}
|
||||
|
||||
/// Dummy implementation of the [`TransactionPool`] trait for a transaction pool that is always
|
||||
/// empty and discards all incoming transactions.
|
||||
///
|
||||
/// Requires the "hash" type to implement the `Default` trait.
|
||||
///
|
||||
/// Useful for testing purposes.
|
||||
pub struct EmptyTransactionPool;
|
||||
|
||||
impl<H: ExHashT + Default, B: BlockT> TransactionPool<H, B> for EmptyTransactionPool {
|
||||
fn transactions(&self) -> Vec<(H, B::Extrinsic)> {
|
||||
Vec::new()
|
||||
}
|
||||
|
||||
fn hash_of(&self, _transaction: &B::Extrinsic) -> H {
|
||||
Default::default()
|
||||
}
|
||||
|
||||
fn import(
|
||||
&self,
|
||||
_report_handle: ReportHandle,
|
||||
_who: PeerId,
|
||||
_rep_change_good: ReputationChange,
|
||||
_rep_change_bad: ReputationChange,
|
||||
_transaction: B::Extrinsic
|
||||
) {}
|
||||
|
||||
fn on_broadcasted(&self, _: HashMap<H, Vec<String>>) {}
|
||||
|
||||
fn transaction(&self, _h: &H) -> Option<B::Extrinsic> { None }
|
||||
}
|
||||
|
||||
/// A cloneable handle for reporting cost/benefits of peers.
|
||||
#[derive(Clone)]
|
||||
pub struct ReportHandle {
|
||||
|
||||
@@ -46,17 +46,18 @@ use sp_consensus::block_import::{BlockImport, ImportResult};
|
||||
use sp_consensus::Error as ConsensusError;
|
||||
use sp_consensus::{BlockOrigin, ForkChoiceStrategy, BlockImportParams, BlockCheckParams, JustificationImport};
|
||||
use futures::prelude::*;
|
||||
use sc_network::{NetworkWorker, NetworkStateInfo, NetworkService, ReportHandle, config::ProtocolId};
|
||||
use sc_network::{NetworkWorker, NetworkStateInfo, NetworkService, config::ProtocolId};
|
||||
use sc_network::config::{NetworkConfiguration, TransportConfig, BoxFinalityProofRequestBuilder};
|
||||
use libp2p::PeerId;
|
||||
use parking_lot::Mutex;
|
||||
use sp_core::H256;
|
||||
use sc_network::config::{ProtocolConfig, TransactionPool};
|
||||
use sc_network::config::ProtocolConfig;
|
||||
use sp_runtime::generic::{BlockId, OpaqueDigestItemId};
|
||||
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
|
||||
use sp_runtime::Justification;
|
||||
use substrate_test_runtime_client::{self, AccountKeyring};
|
||||
|
||||
pub use sc_network::config::EmptyTransactionPool;
|
||||
pub use substrate_test_runtime_client::runtime::{Block, Extrinsic, Hash, Transfer};
|
||||
pub use substrate_test_runtime_client::{TestClient, TestClientBuilder, TestClientBuilderExt};
|
||||
|
||||
@@ -382,31 +383,6 @@ impl<D> Peer<D> {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct EmptyTransactionPool;
|
||||
|
||||
impl TransactionPool<Hash, Block> for EmptyTransactionPool {
|
||||
fn transactions(&self) -> Vec<(Hash, Extrinsic)> {
|
||||
Vec::new()
|
||||
}
|
||||
|
||||
fn hash_of(&self, _transaction: &Extrinsic) -> Hash {
|
||||
Hash::default()
|
||||
}
|
||||
|
||||
fn import(
|
||||
&self,
|
||||
_report_handle: ReportHandle,
|
||||
_who: PeerId,
|
||||
_rep_change_good: sc_network::ReputationChange,
|
||||
_rep_change_bad: sc_network::ReputationChange,
|
||||
_transaction: Extrinsic
|
||||
) {}
|
||||
|
||||
fn on_broadcasted(&self, _: HashMap<Hash, Vec<String>>) {}
|
||||
|
||||
fn transaction(&self, _h: &Hash) -> Option<Extrinsic> { None }
|
||||
}
|
||||
|
||||
/// Implements `BlockImport` for any `Transaction`. Internally the transaction is
|
||||
/// "converted", aka the field is set to `None`.
|
||||
///
|
||||
|
||||
Reference in New Issue
Block a user