From 76737891bbf46d7b21c98812ad08049cdcdf1c61 Mon Sep 17 00:00:00 2001 From: Guillaume Thiolliere Date: Tue, 23 Nov 2021 19:28:58 +0900 Subject: [PATCH] remove unused file (#10343) --- substrate/client/network/src/gossip.rs | 229 ----------------- substrate/client/network/src/gossip/tests.rs | 250 ------------------- 2 files changed, 479 deletions(-) delete mode 100644 substrate/client/network/src/gossip.rs delete mode 100644 substrate/client/network/src/gossip/tests.rs diff --git a/substrate/client/network/src/gossip.rs b/substrate/client/network/src/gossip.rs deleted file mode 100644 index 0bc46b2164..0000000000 --- a/substrate/client/network/src/gossip.rs +++ /dev/null @@ -1,229 +0,0 @@ -// This file is part of Substrate. - -// Copyright (C) 2017-2021 Parity Technologies (UK) Ltd. -// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -//! Helper for sending rate-limited gossip messages. -//! -//! # Context -//! -//! The [`NetworkService`] struct provides a way to send notifications to a certain peer through -//! the [`NetworkService::notification_sender`] method. This method is quite low level and isn't -//! expected to be used directly. -//! -//! The [`QueuedSender`] struct provided by this module is built on top of -//! [`NetworkService::notification_sender`] and provides a cleaner way to send notifications. -//! -//! # Behaviour -//! -//! An instance of [`QueuedSender`] is specific to a certain combination of `PeerId` and -//! protocol name. It maintains a buffer of messages waiting to be sent out. The user of this API -//! is able to manipulate that queue, adding or removing obsolete messages. -//! -//! Creating a [`QueuedSender`] also returns a opaque `Future` whose responsibility it to -//! drain that queue and actually send the messages. If the substream with the given combination -//! of peer and protocol is closed, the queue is silently discarded. It is the role of the user -//! to track which peers we are connected to. -//! -//! In normal situations, messages sent through a [`QueuedSender`] will arrive in the same -//! order as they have been sent. -//! It is possible, in the situation of disconnects and reconnects, that messages arrive in a -//! different order. See also . -//! However, if multiple instances of [`QueuedSender`] exist for the same peer and protocol, or -//! if some other code uses the [`NetworkService`] to send notifications to this combination or -//! peer and protocol, then the notifications will be interleaved in an unpredictable way. -//! - -use crate::{ExHashT, NetworkService}; - -use async_std::sync::{Mutex, MutexGuard}; -use futures::prelude::*; -use futures::channel::mpsc::{channel, Receiver, Sender}; -use libp2p::PeerId; -use sp_runtime::traits::Block as BlockT; -use std::{ - borrow::Cow, - collections::VecDeque, - fmt, - sync::Arc, -}; - -#[cfg(test)] -mod tests; - -/// Notifications sender for a specific combination of network service, peer, and protocol. -pub struct QueuedSender { - /// Shared between the user-facing [`QueuedSender`] and the background future. - shared_message_queue: SharedMessageQueue, - /// Used to notify the background future to check for new messages in the message queue. - notify_background_future: Sender<()>, - /// Maximum number of elements in [`QueuedSender::shared_message_queue`]. - queue_size_limit: usize, -} - -impl QueuedSender { - /// Returns a new [`QueuedSender`] containing a queue of message for this specific - /// combination of peer and protocol. - /// - /// In addition to the [`QueuedSender`], also returns a `Future` whose role is to drive - /// the messages sending forward. - pub fn new( - service: Arc>, - peer_id: PeerId, - protocol: Cow<'static, str>, - queue_size_limit: usize, - messages_encode: F - ) -> (Self, impl Future + Send + 'static) - where - M: Send + 'static, - B: BlockT + 'static, - H: ExHashT, - F: Fn(M) -> Vec + Send + 'static, - { - let (notify_background_future, wait_for_sender) = channel(0); - - let shared_message_queue = Arc::new(Mutex::new( - VecDeque::with_capacity(queue_size_limit), - )); - - let background_future = create_background_future( - wait_for_sender, - service, - peer_id, - protocol, - shared_message_queue.clone(), - messages_encode - ); - - let sender = Self { - shared_message_queue, - notify_background_future, - queue_size_limit, - }; - - (sender, background_future) - } - - /// Locks the queue of messages towards this peer. - /// - /// The returned `Future` is expected to be ready quite quickly. - pub async fn lock_queue<'a>(&'a mut self) -> QueueGuard<'a, M> { - QueueGuard { - message_queue: self.shared_message_queue.lock().await, - queue_size_limit: self.queue_size_limit, - notify_background_future: &mut self.notify_background_future, - } - } - - /// Pushes a message to the queue, or discards it if the queue is full. - /// - /// The returned `Future` is expected to be ready quite quickly. - pub async fn queue_or_discard(&mut self, message: M) - where - M: Send + 'static - { - self.lock_queue().await.push_or_discard(message); - } -} - -impl fmt::Debug for QueuedSender { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("QueuedSender").finish() - } -} - -/// Locked queue of messages to the given peer. -/// -/// As long as this struct exists, the background future is asleep and the owner of the -/// [`QueueGuard`] is in total control of the message queue. Messages can only ever be sent out on -/// the network after the [`QueueGuard`] is dropped. -#[must_use] -pub struct QueueGuard<'a, M> { - message_queue: MutexGuard<'a, MessageQueue>, - /// Same as [`QueuedSender::queue_size_limit`]. - queue_size_limit: usize, - notify_background_future: &'a mut Sender<()>, -} - -impl<'a, M: Send + 'static> QueueGuard<'a, M> { - /// Pushes a message to the queue, or discards it if the queue is full. - /// - /// The message will only start being sent out after the [`QueueGuard`] is dropped. - pub fn push_or_discard(&mut self, message: M) { - if self.message_queue.len() < self.queue_size_limit { - self.message_queue.push_back(message); - } - } - - /// Calls `filter` for each message in the queue, and removes the ones for which `false` is - /// returned. - /// - /// > **Note**: The parameter of `filter` is a `&M` and not a `&mut M` (which would be - /// > better) because the underlying implementation relies on `VecDeque::retain`. - pub fn retain(&mut self, filter: impl FnMut(&M) -> bool) { - self.message_queue.retain(filter); - } -} - -impl<'a, M> Drop for QueueGuard<'a, M> { - fn drop(&mut self) { - // Notify background future to check for new messages in the message queue. - let _ = self.notify_background_future.try_send(()); - } -} - -type MessageQueue = VecDeque; - -/// [`MessageQueue`] shared between [`QueuedSender`] and background future. -type SharedMessageQueue = Arc>>; - -async fn create_background_future Vec>( - mut wait_for_sender: Receiver<()>, - service: Arc>, - peer_id: PeerId, - protocol: Cow<'static, str>, - shared_message_queue: SharedMessageQueue, - messages_encode: F, -) { - loop { - if wait_for_sender.next().await.is_none() { - return - } - - loop { - let mut queue_guard = shared_message_queue.lock().await; - let next_message = match queue_guard.pop_front() { - Some(msg) => msg, - None => break, - }; - drop(queue_guard); - - // Starting from below, we try to send the message. If an error happens when sending, - // the only sane option we have is to silently discard the message. - let sender = match service.notification_sender(peer_id.clone(), protocol.clone()) { - Ok(s) => s, - Err(_) => continue, - }; - - let ready = match sender.ready().await { - Ok(r) => r, - Err(_) => continue, - }; - - let _ = ready.send(messages_encode(next_message)); - } - } -} diff --git a/substrate/client/network/src/gossip/tests.rs b/substrate/client/network/src/gossip/tests.rs deleted file mode 100644 index 88c4160bc5..0000000000 --- a/substrate/client/network/src/gossip/tests.rs +++ /dev/null @@ -1,250 +0,0 @@ -// This file is part of Substrate. - -// Copyright (C) 2017-2021 Parity Technologies (UK) Ltd. -// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -use crate::block_request_handler::BlockRequestHandler; -use crate::state_request_handler::StateRequestHandler; -use crate::light_client_requests::handler::LightClientRequestHandler; -use crate::gossip::QueuedSender; -use crate::{config, Event, NetworkService, NetworkWorker}; - -use futures::prelude::*; -use sp_runtime::traits::{Block as BlockT, Header as _}; -use std::{borrow::Cow, sync::Arc, time::Duration}; -use substrate_test_runtime_client::{TestClientBuilder, TestClientBuilderExt as _}; - -type TestNetworkService = NetworkService< - substrate_test_runtime_client::runtime::Block, - substrate_test_runtime_client::runtime::Hash, ->; - -/// Builds a full node to be used for testing. Returns the node service and its associated events -/// stream. -/// -/// > **Note**: We return the events stream in order to not possibly lose events between the -/// > construction of the service and the moment the events stream is grabbed. -fn build_test_full_node(network_config: config::NetworkConfiguration) - -> (Arc, impl Stream) -{ - let client = Arc::new( - TestClientBuilder::with_default_backend() - .build_with_longest_chain() - .0, - ); - - #[derive(Clone)] - struct PassThroughVerifier(bool); - - #[async_trait::async_trait] - impl sc_consensus::Verifier for PassThroughVerifier { - async fn verify( - &mut self, - mut block: sp_consensus::BlockImportParams, - ) -> Result< - ( - sc_consensus::BlockImportParams, - Option)>>, - ), - String, - > { - let maybe_keys = block.header - .digest() - .log(|l| { - l.try_as_raw(sp_runtime::generic::OpaqueDigestItemId::Consensus(b"aura")) - .or_else(|| { - l.try_as_raw(sp_runtime::generic::OpaqueDigestItemId::Consensus(b"babe")) - }) - }) - .map(|blob| { - vec![( - sp_blockchain::well_known_cache_keys::AUTHORITIES, - blob.to_vec(), - )] - }); - - block.finalized = self.0; - block.fork_choice = Some(sc_consensus::ForkChoiceStrategy::LongestChain); - Ok((block, maybe_keys)) - } - } - - let import_queue = Box::new(sc_consensus::BasicQueue::new( - PassThroughVerifier(false), - Box::new(client.clone()), - None, - &sp_core::testing::TaskExecutor::new(), - None, - )); - - let protocol_id = config::ProtocolId::from("/test-protocol-name"); - - let block_request_protocol_config = { - let (handler, protocol_config) = BlockRequestHandler::new( - &protocol_id, - client.clone(), - 50, - ); - async_std::task::spawn(handler.run().boxed()); - protocol_config - }; - - let state_request_protocol_config = { - let (handler, protocol_config) = StateRequestHandler::new( - &protocol_id, - client.clone(), - 50, - ); - async_std::task::spawn(handler.run().boxed()); - protocol_config - }; - - let light_client_request_protocol_config = { - let (handler, protocol_config) = LightClientRequestHandler::new( - &protocol_id, - client.clone(), - ); - async_std::task::spawn(handler.run().boxed()); - protocol_config - }; - - let worker = NetworkWorker::new(config::Params { - role: config::Role::Full, - executor: None, - transactions_handler_executor: Box::new(|task| { async_std::task::spawn(task); }), - network_config, - chain: client.clone(), - on_demand: None, - transaction_pool: Arc::new(crate::config::EmptyTransactionPool), - protocol_id, - import_queue, - block_announce_validator: Box::new( - sp_consensus::block_validation::DefaultBlockAnnounceValidator, - ), - metrics_registry: None, - block_request_protocol_config, - state_request_protocol_config, - light_client_request_protocol_config, - warp_sync: None, - }) - .unwrap(); - - let service = worker.service().clone(); - let event_stream = service.event_stream("test"); - - async_std::task::spawn(async move { - futures::pin_mut!(worker); - let _ = worker.await; - }); - - (service, event_stream) -} - -const PROTOCOL_NAME: Cow<'static, str> = Cow::Borrowed("/foo"); - -/// Builds two nodes and their associated events stream. -/// The nodes are connected together and have the `PROTOCOL_NAME` protocol registered. -fn build_nodes_one_proto() - -> (Arc, impl Stream, Arc, impl Stream) -{ - let listen_addr = config::build_multiaddr![Memory(rand::random::())]; - - let (node1, events_stream1) = build_test_full_node(config::NetworkConfiguration { - extra_sets: vec![ - config::NonDefaultSetConfig { - notifications_protocol: PROTOCOL_NAME, - fallback_names: Vec::new(), - max_notification_size: 1024 * 1024, - set_config: Default::default() - } - ], - listen_addresses: vec![listen_addr.clone()], - transport: config::TransportConfig::MemoryOnly, - .. config::NetworkConfiguration::new_local() - }); - - let (node2, events_stream2) = build_test_full_node(config::NetworkConfiguration { - listen_addresses: vec![], - extra_sets: vec![ - config::NonDefaultSetConfig { - notifications_protocol: PROTOCOL_NAME, - fallback_names: Vec::new(), - max_notification_size: 1024 * 1024, - set_config: config::SetConfig { - reserved_nodes: vec![config::MultiaddrWithPeerId { - multiaddr: listen_addr, - peer_id: node1.local_peer_id().clone(), - }], - .. Default::default() - }, - } - ], - transport: config::TransportConfig::MemoryOnly, - .. config::NetworkConfiguration::new_local() - }); - - (node1, events_stream1, node2, events_stream2) -} - -#[test] -fn basic_works() { - const NUM_NOTIFS: usize = 256; - - let (node1, mut events_stream1, node2, mut events_stream2) = build_nodes_one_proto(); - let node2_id = node2.local_peer_id().clone(); - - let receiver = async_std::task::spawn(async move { - let mut received_notifications = 0; - - while received_notifications < NUM_NOTIFS { - match events_stream2.next().await.unwrap() { - Event::NotificationStreamClosed { .. } => panic!(), - Event::NotificationsReceived { messages, .. } => { - for message in messages { - assert_eq!(message.0, PROTOCOL_NAME); - assert_eq!(message.1, &b"message"[..]); - received_notifications += 1; - } - } - _ => {} - }; - - if rand::random::() < 2 { - async_std::task::sleep(Duration::from_millis(rand::random::() % 750)).await; - } - } - }); - - async_std::task::block_on(async move { - let (mut sender, bg_future) = - QueuedSender::new(node1, node2_id, PROTOCOL_NAME, NUM_NOTIFS, |msg| msg); - async_std::task::spawn(bg_future); - - // Wait for the `NotificationStreamOpened`. - loop { - match events_stream1.next().await.unwrap() { - Event::NotificationStreamOpened { .. } => break, - _ => {} - }; - } - - for _ in 0..NUM_NOTIFS { - sender.queue_or_discard(b"message".to_vec()).await; - } - - receiver.await; - }); -}