Move transactions protocol to its own crate (#12264)

* Move transaction protocol to its own crate

* Update Cargo.lock

* Fix binaries

* Update client/network/transactions/src/lib.rs

Co-authored-by: Dmitry Markin <dmitry@markin.tech>

* Update client/service/src/builder.rs

Co-authored-by: Bastian Köcher <info@kchr.de>

* Apply review comments

* Revert one change and apply cargo-fmt

* Remove Transaction from Message

* Add array-bytes

* trigger CI

* Add comment about codec index

Co-authored-by: Dmitry Markin <dmitry@markin.tech>
Co-authored-by: Bastian Köcher <info@kchr.de>
This commit is contained in:
Aaro Altonen
2022-09-26 15:10:09 +03:00
committed by GitHub
parent ea377d0b17
commit 4c19c13d05
32 changed files with 466 additions and 471 deletions
+6 -203
View File
@@ -27,24 +27,24 @@ pub use sc_network_common::{
IncomingRequest, OutgoingResponse, ProtocolConfig as RequestResponseConfig,
},
sync::warp::WarpSyncProvider,
ExHashT,
};
pub use libp2p::{build_multiaddr, core::PublicKey, identity};
use crate::ExHashT;
use core::{fmt, iter};
use futures::future;
use libp2p::{
identity::{ed25519, Keypair},
multiaddr, Multiaddr,
};
use prometheus_endpoint::Registry;
use sc_consensus::ImportQueue;
use sc_network_common::{config::MultiaddrWithPeerId, protocol::ProtocolName, sync::ChainSync};
use sc_network_common::{
config::{MultiaddrWithPeerId, NonDefaultSetConfig, SetConfig, TransportConfig},
sync::ChainSync,
};
use sp_runtime::traits::Block as BlockT;
use std::{
collections::HashMap,
error::Error,
fs,
future::Future,
@@ -52,16 +52,14 @@ use std::{
net::Ipv4Addr,
path::{Path, PathBuf},
pin::Pin,
str,
sync::Arc,
};
use zeroize::Zeroize;
/// Network initialization parameters.
pub struct Params<B, H, Client>
pub struct Params<B, Client>
where
B: BlockT + 'static,
H: ExHashT,
{
/// Assigned role for our node (full, light, ...).
pub role: Role,
@@ -70,21 +68,12 @@ where
/// default.
pub executor: Option<Box<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send>>,
/// How to spawn the background task dedicated to the transactions handler.
pub transactions_handler_executor: Box<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send>,
/// Network layer configuration.
pub network_config: NetworkConfiguration,
/// Client that contains the blockchain.
pub chain: Arc<Client>,
/// Pool of transactions.
///
/// The network worker will fetch transactions from this object in order to propagate them on
/// the network.
pub transaction_pool: Arc<dyn TransactionPool<H, B>>,
/// Legacy name of the protocol to use on the wire. Should be different for each chain.
pub protocol_id: ProtocolId,
@@ -166,66 +155,6 @@ impl fmt::Display for Role {
}
}
/// Result of the transaction import.
#[derive(Clone, Copy, Debug)]
pub enum TransactionImport {
/// Transaction is good but already known by the transaction pool.
KnownGood,
/// Transaction is good and not yet known.
NewGood,
/// Transaction is invalid.
Bad,
/// Transaction import was not performed.
None,
}
/// Future resolving to transaction import result.
pub type TransactionImportFuture = Pin<Box<dyn Future<Output = TransactionImport> + Send>>;
/// Transaction pool interface
pub trait TransactionPool<H: ExHashT, B: BlockT>: Send + Sync {
/// Get transactions from the pool that are ready to be propagated.
fn transactions(&self) -> Vec<(H, B::Extrinsic)>;
/// Get hash of transaction.
fn hash_of(&self, transaction: &B::Extrinsic) -> H;
/// Import a transaction into the pool.
///
/// This will return future.
fn import(&self, transaction: B::Extrinsic) -> TransactionImportFuture;
/// Notify the pool about transactions broadcast.
fn on_broadcasted(&self, propagations: HashMap<H, Vec<String>>);
/// Get transaction by hash.
fn transaction(&self, hash: &H) -> Option<B::Extrinsic>;
}
/// Dummy implementation of the [`TransactionPool`] trait for a transaction pool that is always
/// empty and discards all incoming transactions.
///
/// Requires the "hash" type to implement the `Default` trait.
///
/// Useful for testing purposes.
pub struct EmptyTransactionPool;
impl<H: ExHashT + Default, B: BlockT> TransactionPool<H, B> for EmptyTransactionPool {
fn transactions(&self) -> Vec<(H, B::Extrinsic)> {
Vec::new()
}
fn hash_of(&self, _transaction: &B::Extrinsic) -> H {
Default::default()
}
fn import(&self, _transaction: B::Extrinsic) -> TransactionImportFuture {
Box::pin(future::ready(TransactionImport::KnownGood))
}
fn on_broadcasted(&self, _: HashMap<H, Vec<String>>) {}
fn transaction(&self, _h: &H) -> Option<B::Extrinsic> {
None
}
}
/// Sync operation mode.
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum SyncMode {
@@ -394,132 +323,6 @@ impl NetworkConfiguration {
}
}
/// Configuration for a set of nodes.
#[derive(Clone, Debug)]
pub struct SetConfig {
/// Maximum allowed number of incoming substreams related to this set.
pub in_peers: u32,
/// Number of outgoing substreams related to this set that we're trying to maintain.
pub out_peers: u32,
/// List of reserved node addresses.
pub reserved_nodes: Vec<MultiaddrWithPeerId>,
/// Whether nodes that aren't in [`SetConfig::reserved_nodes`] are accepted or automatically
/// refused.
pub non_reserved_mode: NonReservedPeerMode,
}
impl Default for SetConfig {
fn default() -> Self {
Self {
in_peers: 25,
out_peers: 75,
reserved_nodes: Vec::new(),
non_reserved_mode: NonReservedPeerMode::Accept,
}
}
}
/// Extension to [`SetConfig`] for sets that aren't the default set.
///
/// > **Note**: As new fields might be added in the future, please consider using the `new` method
/// > and modifiers instead of creating this struct manually.
#[derive(Clone, Debug)]
pub struct NonDefaultSetConfig {
/// Name of the notifications protocols of this set. A substream on this set will be
/// considered established once this protocol is open.
///
/// > **Note**: This field isn't present for the default set, as this is handled internally
/// > by the networking code.
pub notifications_protocol: ProtocolName,
/// If the remote reports that it doesn't support the protocol indicated in the
/// `notifications_protocol` field, then each of these fallback names will be tried one by
/// one.
///
/// If a fallback is used, it will be reported in
/// [`crate::Event::NotificationStreamOpened::negotiated_fallback`].
pub fallback_names: Vec<ProtocolName>,
/// Maximum allowed size of single notifications.
pub max_notification_size: u64,
/// Base configuration.
pub set_config: SetConfig,
}
impl NonDefaultSetConfig {
/// Creates a new [`NonDefaultSetConfig`]. Zero slots and accepts only reserved nodes.
pub fn new(notifications_protocol: ProtocolName, max_notification_size: u64) -> Self {
Self {
notifications_protocol,
max_notification_size,
fallback_names: Vec::new(),
set_config: SetConfig {
in_peers: 0,
out_peers: 0,
reserved_nodes: Vec::new(),
non_reserved_mode: NonReservedPeerMode::Deny,
},
}
}
/// Modifies the configuration to allow non-reserved nodes.
pub fn allow_non_reserved(&mut self, in_peers: u32, out_peers: u32) {
self.set_config.in_peers = in_peers;
self.set_config.out_peers = out_peers;
self.set_config.non_reserved_mode = NonReservedPeerMode::Accept;
}
/// Add a node to the list of reserved nodes.
pub fn add_reserved(&mut self, peer: MultiaddrWithPeerId) {
self.set_config.reserved_nodes.push(peer);
}
/// Add a list of protocol names used for backward compatibility.
///
/// See the explanations in [`NonDefaultSetConfig::fallback_names`].
pub fn add_fallback_names(&mut self, fallback_names: Vec<ProtocolName>) {
self.fallback_names.extend(fallback_names);
}
}
/// Configuration for the transport layer.
#[derive(Clone, Debug)]
pub enum TransportConfig {
/// Normal transport mode.
Normal {
/// If true, the network will use mDNS to discover other libp2p nodes on the local network
/// and connect to them if they support the same chain.
enable_mdns: bool,
/// If true, allow connecting to private IPv4 addresses (as defined in
/// [RFC1918](https://tools.ietf.org/html/rfc1918)). Irrelevant for addresses that have
/// been passed in [`NetworkConfiguration::boot_nodes`].
allow_private_ipv4: bool,
},
/// Only allow connections within the same process.
/// Only addresses of the form `/memory/...` will be supported.
MemoryOnly,
}
/// The policy for connections to non-reserved peers.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum NonReservedPeerMode {
/// Accept them. This is the default.
Accept,
/// Deny them.
Deny,
}
impl NonReservedPeerMode {
/// Attempt to parse the peer mode from a string.
pub fn parse(s: &str) -> Option<Self> {
match s {
"accept" => Some(Self::Accept),
"deny" => Some(Self::Deny),
_ => None,
}
}
}
/// The configuration of a node's secret key, describing the type of key
/// and how it is obtained. A node's identity keypair is the result of
/// the evaluation of the node key configuration.
+1 -2
View File
@@ -46,7 +46,6 @@
//! active mechanism that asks nodes for the addresses they are listening on. Whenever we learn
//! of a node's address, you must call `add_self_reported_address`.
use crate::utils::LruHashSet;
use futures::prelude::*;
use futures_timer::Delay;
use ip_network::IpNetwork;
@@ -72,7 +71,7 @@ use libp2p::{
},
};
use log::{debug, error, info, trace, warn};
use sc_network_common::config::ProtocolId;
use sc_network_common::{config::ProtocolId, utils::LruHashSet};
use sp_core::hexdisplay::HexDisplay;
use std::{
cmp,
-78
View File
@@ -1,78 +0,0 @@
// This file is part of Substrate.
// Copyright (C) 2017-2022 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Substrate network possible errors.
use crate::config::TransportConfig;
use libp2p::{Multiaddr, PeerId};
use sc_network_common::protocol::ProtocolName;
use std::fmt;
/// Result type alias for the network.
pub type Result<T> = std::result::Result<T, Error>;
/// Error type for the network.
#[derive(thiserror::Error)]
pub enum Error {
/// Io error
#[error(transparent)]
Io(#[from] std::io::Error),
/// Client error
#[error(transparent)]
Client(#[from] Box<sp_blockchain::Error>),
/// The same bootnode (based on address) is registered with two different peer ids.
#[error(
"The same bootnode (`{address}`) is registered with two different peer ids: `{first_id}` and `{second_id}`"
)]
DuplicateBootnode {
/// The address of the bootnode.
address: Multiaddr,
/// The first peer id that was found for the bootnode.
first_id: PeerId,
/// The second peer id that was found for the bootnode.
second_id: PeerId,
},
/// Prometheus metrics error.
#[error(transparent)]
Prometheus(#[from] prometheus_endpoint::PrometheusError),
/// The network addresses are invalid because they don't match the transport.
#[error(
"The following addresses are invalid because they don't match the transport: {addresses:?}"
)]
AddressesForAnotherTransport {
/// Transport used.
transport: TransportConfig,
/// The invalid addresses.
addresses: Vec<Multiaddr>,
},
/// The same request-response protocol has been registered multiple times.
#[error("Request-response protocol registered multiple times: {protocol}")]
DuplicateRequestResponseProtocol {
/// Name of the protocol registered multiple times.
protocol: ProtocolName,
},
}
// Make `Debug` use the `Display` implementation.
impl fmt::Debug for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(self, f)
}
}
+2 -11
View File
@@ -251,12 +251,9 @@ mod protocol;
mod request_responses;
mod service;
mod transport;
mod utils;
pub mod config;
pub mod error;
pub mod network_state;
pub mod transactions;
#[doc(inline)]
pub use libp2p::{multiaddr, Multiaddr, PeerId};
@@ -269,8 +266,8 @@ pub use sc_network_common::{
request_responses::{IfDisconnected, RequestFailure},
service::{
KademliaKey, NetworkBlock, NetworkDHTProvider, NetworkRequest, NetworkSigner,
NetworkStateInfo, NetworkStatus, NetworkStatusProvider, NetworkSyncForkRequest,
NetworkTransaction, Signature, SigningError,
NetworkStateInfo, NetworkStatus, NetworkStatusProvider, NetworkSyncForkRequest, Signature,
SigningError,
},
sync::{
warp::{WarpSyncPhase, WarpSyncProgress},
@@ -295,9 +292,3 @@ const MAX_CONNECTIONS_PER_PEER: usize = 2;
/// The maximum number of concurrent established connections that were incoming.
const MAX_CONNECTIONS_ESTABLISHED_INCOMING: u32 = 10_000;
/// Minimum Requirements for a Hash within Networking
pub trait ExHashT: std::hash::Hash + Eq + std::fmt::Debug + Clone + Send + Sync + 'static {}
impl<T> ExHashT for T where T: std::hash::Hash + Eq + std::fmt::Debug + Clone + Send + Sync + 'static
{}
+1 -1
View File
@@ -16,7 +16,6 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::utils::interval;
use fnv::FnvHashMap;
use futures::prelude::*;
use libp2p::{
@@ -33,6 +32,7 @@ use libp2p::{
Multiaddr,
};
use log::{debug, error, trace};
use sc_network_common::utils::interval;
use smallvec::SmallVec;
use std::{
collections::hash_map::Entry,
+6 -7
View File
@@ -16,10 +16,7 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::{
config, error,
utils::{interval, LruHashSet},
};
use crate::config;
use bytes::Bytes;
use codec::{Decode, DecodeAll, Encode};
@@ -45,7 +42,8 @@ use sc_consensus::import_queue::{
BlockImportError, BlockImportStatus, IncomingBlock, RuntimeOrigin,
};
use sc_network_common::{
config::ProtocolId,
config::{NonReservedPeerMode, ProtocolId},
error,
protocol::ProtocolName,
request_responses::RequestFailure,
sync::{
@@ -57,6 +55,7 @@ use sc_network_common::{
OpaqueBlockResponse, OpaqueStateRequest, OpaqueStateResponse, PollBlockAnnounceValidation,
SyncStatus,
},
utils::{interval, LruHashSet},
};
use sp_arithmetic::traits::SaturatedConversion;
use sp_consensus::BlockOrigin;
@@ -341,7 +340,7 @@ where
bootnodes,
reserved_nodes: default_sets_reserved.clone(),
reserved_only: network_config.default_peers_set.non_reserved_mode ==
config::NonReservedPeerMode::Deny,
NonReservedPeerMode::Deny,
});
for set_cfg in &network_config.extra_sets {
@@ -352,7 +351,7 @@ where
}
let reserved_only =
set_cfg.set_config.non_reserved_mode == config::NonReservedPeerMode::Deny;
set_cfg.set_config.non_reserved_mode == NonReservedPeerMode::Deny;
sets.push(sc_peerset::SetConfig {
in_peers: set_cfg.set_config.in_peers,
@@ -36,9 +36,6 @@ pub type Message<B> = generic::Message<
<B as BlockT>::Extrinsic,
>;
/// A set of transactions.
pub type Transactions<E> = Vec<E>;
/// Remote call response.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
pub struct RemoteCallResponse {
@@ -59,7 +56,7 @@ pub struct RemoteReadResponse {
/// Generic types.
pub mod generic {
use super::{RemoteCallResponse, RemoteReadResponse, Transactions};
use super::{RemoteCallResponse, RemoteReadResponse};
use bitflags::bitflags;
use codec::{Decode, Encode, Input, Output};
use sc_client_api::StorageProof;
@@ -146,9 +143,10 @@ pub mod generic {
BlockResponse(BlockResponse<Header, Hash, Extrinsic>),
/// Block announce.
BlockAnnounce(BlockAnnounce<Header>),
/// Transactions.
Transactions(Transactions<Extrinsic>),
/// Consensus protocol message.
// NOTE: index is incremented by 1 due to transaction-related
// message that was removed
#[codec(index = 6)]
Consensus(ConsensusMessage),
/// Remote method call request.
RemoteCallRequest(RemoteCallRequest<Hash>),
+16 -61
View File
@@ -29,9 +29,8 @@
use crate::{
behaviour::{self, Behaviour, BehaviourOut},
config::{Params, TransportConfig},
config::Params,
discovery::DiscoveryConfig,
error::Error,
network_state::{
NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer,
},
@@ -39,7 +38,7 @@ use crate::{
self, message::generic::Roles, NotificationsSink, NotifsHandlerError, PeerInfo, Protocol,
Ready,
},
transactions, transport, ExHashT, ReputationChange,
transport, ReputationChange,
};
use codec::Encode as _;
@@ -60,7 +59,8 @@ use metrics::{Histogram, HistogramVec, MetricSources, Metrics};
use parking_lot::Mutex;
use sc_consensus::{BlockImportError, BlockImportStatus, ImportQueue, Link};
use sc_network_common::{
config::MultiaddrWithPeerId,
config::{MultiaddrWithPeerId, TransportConfig},
error::Error,
protocol::{
event::{DhtEvent, Event},
ProtocolName,
@@ -73,6 +73,7 @@ use sc_network_common::{
NotificationSenderReady as NotificationSenderReadyT, Signature, SigningError,
},
sync::{SyncState, SyncStatus},
ExHashT,
};
use sc_peerset::PeersetHandle;
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
@@ -101,7 +102,7 @@ mod out_events;
mod tests;
pub use libp2p::identity::{error::DecodingError, Keypair, PublicKey};
use sc_network_common::service::{NetworkBlock, NetworkRequest, NetworkTransaction};
use sc_network_common::service::{NetworkBlock, NetworkRequest};
/// Substrate network service. Handles network IO and manages connectivity.
pub struct NetworkService<B: BlockT + 'static, H: ExHashT> {
@@ -121,7 +122,7 @@ pub struct NetworkService<B: BlockT + 'static, H: ExHashT> {
/// nodes it should be connected to or not.
peerset: PeersetHandle,
/// Channel that sends messages to the actual worker.
to_worker: TracingUnboundedSender<ServiceToWorkerMsg<B, H>>,
to_worker: TracingUnboundedSender<ServiceToWorkerMsg<B>>,
/// For each peer and protocol combination, an object that allows sending notifications to
/// that peer. Updated by the [`NetworkWorker`].
peers_notifications_sinks: Arc<Mutex<HashMap<(PeerId, ProtocolName), NotificationsSink>>>,
@@ -144,7 +145,7 @@ where
/// Returns a `NetworkWorker` that implements `Future` and must be regularly polled in order
/// for the network processing to advance. From it, you can extract a `NetworkService` using
/// `worker.service()`. The `NetworkService` can be shared through the codebase.
pub fn new(mut params: Params<B, H, Client>) -> Result<Self, Error> {
pub fn new(mut params: Params<B, Client>) -> Result<Self, Error> {
// Private and public keys configuration.
let local_identity = params.network_config.node_key.clone().into_keypair()?;
let local_public = local_identity.public();
@@ -215,21 +216,6 @@ where
fs::create_dir_all(path)?;
}
let transactions_handler_proto = transactions::TransactionsHandlerPrototype::new(
params.protocol_id.clone(),
params
.chain
.hash(0u32.into())
.ok()
.flatten()
.expect("Genesis block exists; qed"),
params.fork_id.clone(),
);
params
.network_config
.extra_sets
.insert(0, transactions_handler_proto.set_config());
info!(
target: "sub-libp2p",
"🏷 Local node identity is: {}",
@@ -244,11 +230,8 @@ where
params.protocol_id.clone(),
&params.fork_id,
&params.network_config,
iter::once(Vec::new())
.chain(
(0..params.network_config.extra_sets.len() - 1)
.map(|_| default_notif_handshake_message.clone()),
)
(0..params.network_config.extra_sets.len())
.map(|_| default_notif_handshake_message.clone())
.collect(),
params.metrics_registry.as_ref(),
params.chain_sync,
@@ -465,13 +448,6 @@ where
_marker: PhantomData,
});
let (tx_handler, tx_handler_controller) = transactions_handler_proto.build(
service.clone(),
params.transaction_pool,
params.metrics_registry.as_ref(),
)?;
(params.transactions_handler_executor)(tx_handler.run().boxed());
Ok(NetworkWorker {
external_addresses,
num_connected,
@@ -482,9 +458,9 @@ where
from_service,
event_streams: out_events::OutChannels::new(params.metrics_registry.as_ref())?,
peers_notifications_sinks,
tx_handler_controller,
metrics,
boot_node_ids,
_marker: Default::default(),
})
}
@@ -1149,20 +1125,6 @@ where
}
}
impl<B, H> NetworkTransaction<H> for NetworkService<B, H>
where
B: BlockT + 'static,
H: ExHashT,
{
fn trigger_repropagate(&self) {
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PropagateTransactions);
}
fn propagate_transaction(&self, hash: H) {
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PropagateTransaction(hash));
}
}
impl<B, H> NetworkBlock<B::Hash, NumberFor<B>> for NetworkService<B, H>
where
B: BlockT + 'static,
@@ -1249,9 +1211,7 @@ impl<'a> NotificationSenderReadyT for NotificationSenderReady<'a> {
/// Messages sent from the `NetworkService` to the `NetworkWorker`.
///
/// Each entry corresponds to a method of `NetworkService`.
enum ServiceToWorkerMsg<B: BlockT, H: ExHashT> {
PropagateTransaction(H),
PropagateTransactions,
enum ServiceToWorkerMsg<B: BlockT> {
RequestJustification(B::Hash, NumberFor<B>),
ClearJustificationRequests,
AnnounceBlock(B::Hash, Option<Vec<u8>>),
@@ -1309,7 +1269,7 @@ where
/// The import queue that was passed at initialization.
import_queue: Box<dyn ImportQueue<B>>,
/// Messages from the [`NetworkService`] that must be processed.
from_service: TracingUnboundedReceiver<ServiceToWorkerMsg<B, H>>,
from_service: TracingUnboundedReceiver<ServiceToWorkerMsg<B>>,
/// Senders for events that happen on the network.
event_streams: out_events::OutChannels,
/// Prometheus network metrics.
@@ -1319,8 +1279,9 @@ where
/// For each peer and protocol combination, an object that allows sending notifications to
/// that peer. Shared with the [`NetworkService`].
peers_notifications_sinks: Arc<Mutex<HashMap<(PeerId, ProtocolName), NotificationsSink>>>,
/// Controller for the handler of incoming and outgoing transactions.
tx_handler_controller: transactions::TransactionsHandlerController<H>,
/// Marker to pin the `H` generic. Serves no purpose except to not break backwards
/// compatibility.
_marker: PhantomData<H>,
}
impl<B, H, Client> Future for NetworkWorker<B, H, Client>
@@ -1376,10 +1337,6 @@ where
.behaviour_mut()
.user_protocol_mut()
.clear_justification_requests(),
ServiceToWorkerMsg::PropagateTransaction(hash) =>
this.tx_handler_controller.propagate_transaction(hash),
ServiceToWorkerMsg::PropagateTransactions =>
this.tx_handler_controller.propagate_transactions(),
ServiceToWorkerMsg::GetValue(key) =>
this.network_service.behaviour_mut().get_value(key),
ServiceToWorkerMsg::PutValue(key, value) =>
@@ -1922,8 +1879,6 @@ where
SyncState::Downloading => true,
};
this.tx_handler_controller.set_gossip_enabled(!is_major_syncing);
this.is_major_syncing.store(is_major_syncing, Ordering::Relaxed);
if let Some(metrics) = this.metrics.as_ref() {
+23 -33
View File
@@ -21,7 +21,7 @@ use crate::{config, NetworkService, NetworkWorker};
use futures::prelude::*;
use libp2p::PeerId;
use sc_network_common::{
config::{MultiaddrWithPeerId, ProtocolId},
config::{MultiaddrWithPeerId, NonDefaultSetConfig, ProtocolId, SetConfig, TransportConfig},
protocol::event::Event,
service::{NetworkEventStream, NetworkNotification, NetworkPeers, NetworkStateInfo},
};
@@ -135,12 +135,8 @@ fn build_test_full_node(
let worker = NetworkWorker::new(config::Params {
role: config::Role::Full,
executor: None,
transactions_handler_executor: Box::new(|task| {
async_std::task::spawn(task);
}),
network_config,
chain: client.clone(),
transaction_pool: Arc::new(config::EmptyTransactionPool),
protocol_id,
fork_id,
import_queue,
@@ -178,23 +174,23 @@ fn build_nodes_one_proto() -> (
let listen_addr = config::build_multiaddr![Memory(rand::random::<u64>())];
let (node1, events_stream1) = build_test_full_node(config::NetworkConfiguration {
extra_sets: vec![config::NonDefaultSetConfig {
extra_sets: vec![NonDefaultSetConfig {
notifications_protocol: PROTOCOL_NAME.into(),
fallback_names: Vec::new(),
max_notification_size: 1024 * 1024,
set_config: Default::default(),
}],
listen_addresses: vec![listen_addr.clone()],
transport: config::TransportConfig::MemoryOnly,
transport: TransportConfig::MemoryOnly,
..config::NetworkConfiguration::new_local()
});
let (node2, events_stream2) = build_test_full_node(config::NetworkConfiguration {
extra_sets: vec![config::NonDefaultSetConfig {
extra_sets: vec![NonDefaultSetConfig {
notifications_protocol: PROTOCOL_NAME.into(),
fallback_names: Vec::new(),
max_notification_size: 1024 * 1024,
set_config: config::SetConfig {
set_config: SetConfig {
reserved_nodes: vec![MultiaddrWithPeerId {
multiaddr: listen_addr,
peer_id: node1.local_peer_id(),
@@ -203,7 +199,7 @@ fn build_nodes_one_proto() -> (
},
}],
listen_addresses: vec![],
transport: config::TransportConfig::MemoryOnly,
transport: TransportConfig::MemoryOnly,
..config::NetworkConfiguration::new_local()
});
@@ -368,13 +364,13 @@ fn lots_of_incoming_peers_works() {
let (main_node, _) = build_test_full_node(config::NetworkConfiguration {
listen_addresses: vec![listen_addr.clone()],
extra_sets: vec![config::NonDefaultSetConfig {
extra_sets: vec![NonDefaultSetConfig {
notifications_protocol: PROTOCOL_NAME.into(),
fallback_names: Vec::new(),
max_notification_size: 1024 * 1024,
set_config: config::SetConfig { in_peers: u32::MAX, ..Default::default() },
set_config: SetConfig { in_peers: u32::MAX, ..Default::default() },
}],
transport: config::TransportConfig::MemoryOnly,
transport: TransportConfig::MemoryOnly,
..config::NetworkConfiguration::new_local()
});
@@ -387,11 +383,11 @@ fn lots_of_incoming_peers_works() {
for _ in 0..32 {
let (_dialing_node, event_stream) = build_test_full_node(config::NetworkConfiguration {
listen_addresses: vec![],
extra_sets: vec![config::NonDefaultSetConfig {
extra_sets: vec![NonDefaultSetConfig {
notifications_protocol: PROTOCOL_NAME.into(),
fallback_names: Vec::new(),
max_notification_size: 1024 * 1024,
set_config: config::SetConfig {
set_config: SetConfig {
reserved_nodes: vec![MultiaddrWithPeerId {
multiaddr: listen_addr.clone(),
peer_id: main_node_peer_id,
@@ -399,7 +395,7 @@ fn lots_of_incoming_peers_works() {
..Default::default()
},
}],
transport: config::TransportConfig::MemoryOnly,
transport: TransportConfig::MemoryOnly,
..config::NetworkConfiguration::new_local()
});
@@ -504,23 +500,23 @@ fn fallback_name_working() {
let listen_addr = config::build_multiaddr![Memory(rand::random::<u64>())];
let (node1, mut events_stream1) = build_test_full_node(config::NetworkConfiguration {
extra_sets: vec![config::NonDefaultSetConfig {
extra_sets: vec![NonDefaultSetConfig {
notifications_protocol: NEW_PROTOCOL_NAME.into(),
fallback_names: vec![PROTOCOL_NAME.into()],
max_notification_size: 1024 * 1024,
set_config: Default::default(),
}],
listen_addresses: vec![listen_addr.clone()],
transport: config::TransportConfig::MemoryOnly,
transport: TransportConfig::MemoryOnly,
..config::NetworkConfiguration::new_local()
});
let (_, mut events_stream2) = build_test_full_node(config::NetworkConfiguration {
extra_sets: vec![config::NonDefaultSetConfig {
extra_sets: vec![NonDefaultSetConfig {
notifications_protocol: PROTOCOL_NAME.into(),
fallback_names: Vec::new(),
max_notification_size: 1024 * 1024,
set_config: config::SetConfig {
set_config: SetConfig {
reserved_nodes: vec![MultiaddrWithPeerId {
multiaddr: listen_addr,
peer_id: node1.local_peer_id(),
@@ -529,7 +525,7 @@ fn fallback_name_working() {
},
}],
listen_addresses: vec![],
transport: config::TransportConfig::MemoryOnly,
transport: TransportConfig::MemoryOnly,
..config::NetworkConfiguration::new_local()
});
@@ -572,7 +568,7 @@ fn ensure_listen_addresses_consistent_with_transport_memory() {
let _ = build_test_full_node(config::NetworkConfiguration {
listen_addresses: vec![listen_addr.clone()],
transport: config::TransportConfig::MemoryOnly,
transport: TransportConfig::MemoryOnly,
..config::NetworkConfiguration::new("test-node", "test-client", Default::default(), None)
});
}
@@ -599,7 +595,7 @@ fn ensure_boot_node_addresses_consistent_with_transport_memory() {
let _ = build_test_full_node(config::NetworkConfiguration {
listen_addresses: vec![listen_addr.clone()],
transport: config::TransportConfig::MemoryOnly,
transport: TransportConfig::MemoryOnly,
boot_nodes: vec![boot_node],
..config::NetworkConfiguration::new("test-node", "test-client", Default::default(), None)
});
@@ -632,11 +628,8 @@ fn ensure_reserved_node_addresses_consistent_with_transport_memory() {
let _ = build_test_full_node(config::NetworkConfiguration {
listen_addresses: vec![listen_addr.clone()],
transport: config::TransportConfig::MemoryOnly,
default_peers_set: config::SetConfig {
reserved_nodes: vec![reserved_node],
..Default::default()
},
transport: TransportConfig::MemoryOnly,
default_peers_set: SetConfig { reserved_nodes: vec![reserved_node], ..Default::default() },
..config::NetworkConfiguration::new("test-node", "test-client", Default::default(), None)
});
}
@@ -652,10 +645,7 @@ fn ensure_reserved_node_addresses_consistent_with_transport_not_memory() {
let _ = build_test_full_node(config::NetworkConfiguration {
listen_addresses: vec![listen_addr.clone()],
default_peers_set: config::SetConfig {
reserved_nodes: vec![reserved_node],
..Default::default()
},
default_peers_set: SetConfig { reserved_nodes: vec![reserved_node], ..Default::default() },
..config::NetworkConfiguration::new("test-node", "test-client", Default::default(), None)
});
}
@@ -668,7 +658,7 @@ fn ensure_public_addresses_consistent_with_transport_memory() {
let _ = build_test_full_node(config::NetworkConfiguration {
listen_addresses: vec![listen_addr.clone()],
transport: config::TransportConfig::MemoryOnly,
transport: TransportConfig::MemoryOnly,
public_addresses: vec![public_address],
..config::NetworkConfiguration::new("test-node", "test-client", Default::default(), None)
});
@@ -1,491 +0,0 @@
// This file is part of Substrate.
// Copyright (C) 2017-2022 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Transactions handling to plug on top of the network service.
//!
//! Usage:
//!
//! - Use [`TransactionsHandlerPrototype::new`] to create a prototype.
//! - Pass the return value of [`TransactionsHandlerPrototype::set_config`] to the network
//! configuration as an extra peers set.
//! - Use [`TransactionsHandlerPrototype::build`] then [`TransactionsHandler::run`] to obtain a
//! `Future` that processes transactions.
use crate::{
config::{self, TransactionImport, TransactionImportFuture, TransactionPool},
error,
protocol::message,
service::NetworkService,
utils::{interval, LruHashSet},
ExHashT,
};
use codec::{Decode, Encode};
use futures::{channel::mpsc, prelude::*, stream::FuturesUnordered};
use libp2p::{multiaddr, PeerId};
use log::{debug, trace, warn};
use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64};
use sc_network_common::{
config::ProtocolId,
protocol::{
event::{Event, ObservedRole},
ProtocolName,
},
service::{NetworkEventStream, NetworkNotification, NetworkPeers},
};
use sp_runtime::traits::Block as BlockT;
use std::{
collections::{hash_map::Entry, HashMap},
iter,
num::NonZeroUsize,
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
task::Poll,
time,
};
/// Interval at which we propagate transactions;
const PROPAGATE_TIMEOUT: time::Duration = time::Duration::from_millis(2900);
/// Maximum number of known transaction hashes to keep for a peer.
///
/// This should be approx. 2 blocks full of transactions for the network to function properly.
const MAX_KNOWN_TRANSACTIONS: usize = 10240; // ~300kb per peer + overhead.
/// Maximum allowed size for a transactions notification.
const MAX_TRANSACTIONS_SIZE: u64 = 16 * 1024 * 1024;
/// Maximum number of transaction validation request we keep at any moment.
const MAX_PENDING_TRANSACTIONS: usize = 8192;
mod rep {
use sc_peerset::ReputationChange as Rep;
/// Reputation change when a peer sends us any transaction.
///
/// This forces node to verify it, thus the negative value here. Once transaction is verified,
/// reputation change should be refunded with `ANY_TRANSACTION_REFUND`
pub const ANY_TRANSACTION: Rep = Rep::new(-(1 << 4), "Any transaction");
/// Reputation change when a peer sends us any transaction that is not invalid.
pub const ANY_TRANSACTION_REFUND: Rep = Rep::new(1 << 4, "Any transaction (refund)");
/// Reputation change when a peer sends us an transaction that we didn't know about.
pub const GOOD_TRANSACTION: Rep = Rep::new(1 << 7, "Good transaction");
/// Reputation change when a peer sends us a bad transaction.
pub const BAD_TRANSACTION: Rep = Rep::new(-(1 << 12), "Bad transaction");
}
struct Metrics {
propagated_transactions: Counter<U64>,
}
impl Metrics {
fn register(r: &Registry) -> Result<Self, PrometheusError> {
Ok(Self {
propagated_transactions: register(
Counter::new(
"substrate_sync_propagated_transactions",
"Number of transactions propagated to at least one peer",
)?,
r,
)?,
})
}
}
#[pin_project::pin_project]
struct PendingTransaction<H> {
#[pin]
validation: TransactionImportFuture,
tx_hash: H,
}
impl<H: ExHashT> Future for PendingTransaction<H> {
type Output = (H, TransactionImport);
fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
if let Poll::Ready(import_result) = Pin::new(&mut this.validation).poll_unpin(cx) {
return Poll::Ready((this.tx_hash.clone(), import_result))
}
Poll::Pending
}
}
/// Prototype for a [`TransactionsHandler`].
pub struct TransactionsHandlerPrototype {
protocol_name: ProtocolName,
fallback_protocol_names: Vec<ProtocolName>,
}
impl TransactionsHandlerPrototype {
/// Create a new instance.
pub fn new<Hash: AsRef<[u8]>>(
protocol_id: ProtocolId,
genesis_hash: Hash,
fork_id: Option<String>,
) -> Self {
let genesis_hash = genesis_hash.as_ref();
let protocol_name = if let Some(fork_id) = fork_id {
format!("/{}/{}/transactions/1", array_bytes::bytes2hex("", genesis_hash), fork_id)
} else {
format!("/{}/transactions/1", array_bytes::bytes2hex("", genesis_hash))
};
let legacy_protocol_name = format!("/{}/transactions/1", protocol_id.as_ref());
Self {
protocol_name: protocol_name.into(),
fallback_protocol_names: iter::once(legacy_protocol_name.into()).collect(),
}
}
/// Returns the configuration of the set to put in the network configuration.
pub fn set_config(&self) -> config::NonDefaultSetConfig {
config::NonDefaultSetConfig {
notifications_protocol: self.protocol_name.clone(),
fallback_names: self.fallback_protocol_names.clone(),
max_notification_size: MAX_TRANSACTIONS_SIZE,
set_config: config::SetConfig {
in_peers: 0,
out_peers: 0,
reserved_nodes: Vec::new(),
non_reserved_mode: config::NonReservedPeerMode::Deny,
},
}
}
/// Turns the prototype into the actual handler. Returns a controller that allows controlling
/// the behaviour of the handler while it's running.
///
/// Important: the transactions handler is initially disabled and doesn't gossip transactions.
/// You must call [`TransactionsHandlerController::set_gossip_enabled`] to enable it.
pub fn build<B: BlockT + 'static, H: ExHashT>(
self,
service: Arc<NetworkService<B, H>>,
transaction_pool: Arc<dyn TransactionPool<H, B>>,
metrics_registry: Option<&Registry>,
) -> error::Result<(TransactionsHandler<B, H>, TransactionsHandlerController<H>)> {
let event_stream = service.event_stream("transactions-handler");
let (to_handler, from_controller) = mpsc::unbounded();
let gossip_enabled = Arc::new(AtomicBool::new(false));
let handler = TransactionsHandler {
protocol_name: self.protocol_name,
propagate_timeout: Box::pin(interval(PROPAGATE_TIMEOUT)),
pending_transactions: FuturesUnordered::new(),
pending_transactions_peers: HashMap::new(),
gossip_enabled: gossip_enabled.clone(),
service,
event_stream,
peers: HashMap::new(),
transaction_pool,
from_controller,
metrics: if let Some(r) = metrics_registry {
Some(Metrics::register(r)?)
} else {
None
},
};
let controller = TransactionsHandlerController { to_handler, gossip_enabled };
Ok((handler, controller))
}
}
/// Controls the behaviour of a [`TransactionsHandler`] it is connected to.
pub struct TransactionsHandlerController<H: ExHashT> {
to_handler: mpsc::UnboundedSender<ToHandler<H>>,
gossip_enabled: Arc<AtomicBool>,
}
impl<H: ExHashT> TransactionsHandlerController<H> {
/// Controls whether transactions are being gossiped on the network.
pub fn set_gossip_enabled(&mut self, enabled: bool) {
self.gossip_enabled.store(enabled, Ordering::Relaxed);
}
/// You may call this when new transactions are imported by the transaction pool.
///
/// All transactions will be fetched from the `TransactionPool` that was passed at
/// initialization as part of the configuration and propagated to peers.
pub fn propagate_transactions(&self) {
let _ = self.to_handler.unbounded_send(ToHandler::PropagateTransactions);
}
/// You must call when new a transaction is imported by the transaction pool.
///
/// This transaction will be fetched from the `TransactionPool` that was passed at
/// initialization as part of the configuration and propagated to peers.
pub fn propagate_transaction(&self, hash: H) {
let _ = self.to_handler.unbounded_send(ToHandler::PropagateTransaction(hash));
}
}
enum ToHandler<H: ExHashT> {
PropagateTransactions,
PropagateTransaction(H),
}
/// Handler for transactions. Call [`TransactionsHandler::run`] to start the processing.
pub struct TransactionsHandler<B: BlockT + 'static, H: ExHashT> {
protocol_name: ProtocolName,
/// Interval at which we call `propagate_transactions`.
propagate_timeout: Pin<Box<dyn Stream<Item = ()> + Send>>,
/// Pending transactions verification tasks.
pending_transactions: FuturesUnordered<PendingTransaction<H>>,
/// As multiple peers can send us the same transaction, we group
/// these peers using the transaction hash while the transaction is
/// imported. This prevents that we import the same transaction
/// multiple times concurrently.
pending_transactions_peers: HashMap<H, Vec<PeerId>>,
/// Network service to use to send messages and manage peers.
service: Arc<NetworkService<B, H>>,
/// Stream of networking events.
event_stream: Pin<Box<dyn Stream<Item = Event> + Send>>,
// All connected peers
peers: HashMap<PeerId, Peer<H>>,
transaction_pool: Arc<dyn TransactionPool<H, B>>,
gossip_enabled: Arc<AtomicBool>,
from_controller: mpsc::UnboundedReceiver<ToHandler<H>>,
/// Prometheus metrics.
metrics: Option<Metrics>,
}
/// Peer information
#[derive(Debug)]
struct Peer<H: ExHashT> {
/// Holds a set of transactions known to this peer.
known_transactions: LruHashSet<H>,
role: ObservedRole,
}
impl<B: BlockT + 'static, H: ExHashT> TransactionsHandler<B, H> {
/// Turns the [`TransactionsHandler`] into a future that should run forever and not be
/// interrupted.
pub async fn run(mut self) {
loop {
futures::select! {
_ = self.propagate_timeout.next().fuse() => {
self.propagate_transactions();
},
(tx_hash, result) = self.pending_transactions.select_next_some() => {
if let Some(peers) = self.pending_transactions_peers.remove(&tx_hash) {
peers.into_iter().for_each(|p| self.on_handle_transaction_import(p, result));
} else {
warn!(target: "sub-libp2p", "Inconsistent state, no peers for pending transaction!");
}
},
network_event = self.event_stream.next().fuse() => {
if let Some(network_event) = network_event {
self.handle_network_event(network_event).await;
} else {
// Networking has seemingly closed. Closing as well.
return;
}
},
message = self.from_controller.select_next_some().fuse() => {
match message {
ToHandler::PropagateTransaction(hash) => self.propagate_transaction(&hash),
ToHandler::PropagateTransactions => self.propagate_transactions(),
}
},
}
}
}
async fn handle_network_event(&mut self, event: Event) {
match event {
Event::Dht(_) => {},
Event::SyncConnected { remote } => {
let addr = iter::once(multiaddr::Protocol::P2p(remote.into()))
.collect::<multiaddr::Multiaddr>();
let result = self.service.add_peers_to_reserved_set(
self.protocol_name.clone(),
iter::once(addr).collect(),
);
if let Err(err) = result {
log::error!(target: "sync", "Add reserved peer failed: {}", err);
}
},
Event::SyncDisconnected { remote } => {
self.service.remove_peers_from_reserved_set(
self.protocol_name.clone(),
iter::once(remote).collect(),
);
},
Event::NotificationStreamOpened { remote, protocol, role, .. }
if protocol == self.protocol_name =>
{
let _was_in = self.peers.insert(
remote,
Peer {
known_transactions: LruHashSet::new(
NonZeroUsize::new(MAX_KNOWN_TRANSACTIONS).expect("Constant is nonzero"),
),
role,
},
);
debug_assert!(_was_in.is_none());
},
Event::NotificationStreamClosed { remote, protocol }
if protocol == self.protocol_name =>
{
let _peer = self.peers.remove(&remote);
debug_assert!(_peer.is_some());
},
Event::NotificationsReceived { remote, messages } => {
for (protocol, message) in messages {
if protocol != self.protocol_name {
continue
}
if let Ok(m) = <message::Transactions<B::Extrinsic> as Decode>::decode(
&mut message.as_ref(),
) {
self.on_transactions(remote, m);
} else {
warn!(target: "sub-libp2p", "Failed to decode transactions list");
}
}
},
// Not our concern.
Event::NotificationStreamOpened { .. } | Event::NotificationStreamClosed { .. } => {},
}
}
/// Called when peer sends us new transactions
fn on_transactions(&mut self, who: PeerId, transactions: message::Transactions<B::Extrinsic>) {
// Accept transactions only when enabled
if !self.gossip_enabled.load(Ordering::Relaxed) {
trace!(target: "sync", "{} Ignoring transactions while disabled", who);
return
}
trace!(target: "sync", "Received {} transactions from {}", transactions.len(), who);
if let Some(ref mut peer) = self.peers.get_mut(&who) {
for t in transactions {
if self.pending_transactions.len() > MAX_PENDING_TRANSACTIONS {
debug!(
target: "sync",
"Ignoring any further transactions that exceed `MAX_PENDING_TRANSACTIONS`({}) limit",
MAX_PENDING_TRANSACTIONS,
);
break
}
let hash = self.transaction_pool.hash_of(&t);
peer.known_transactions.insert(hash.clone());
self.service.report_peer(who, rep::ANY_TRANSACTION);
match self.pending_transactions_peers.entry(hash.clone()) {
Entry::Vacant(entry) => {
self.pending_transactions.push(PendingTransaction {
validation: self.transaction_pool.import(t),
tx_hash: hash,
});
entry.insert(vec![who]);
},
Entry::Occupied(mut entry) => {
entry.get_mut().push(who);
},
}
}
}
}
fn on_handle_transaction_import(&mut self, who: PeerId, import: TransactionImport) {
match import {
TransactionImport::KnownGood =>
self.service.report_peer(who, rep::ANY_TRANSACTION_REFUND),
TransactionImport::NewGood => self.service.report_peer(who, rep::GOOD_TRANSACTION),
TransactionImport::Bad => self.service.report_peer(who, rep::BAD_TRANSACTION),
TransactionImport::None => {},
}
}
/// Propagate one transaction.
pub fn propagate_transaction(&mut self, hash: &H) {
// Accept transactions only when enabled
if !self.gossip_enabled.load(Ordering::Relaxed) {
return
}
debug!(target: "sync", "Propagating transaction [{:?}]", hash);
if let Some(transaction) = self.transaction_pool.transaction(hash) {
let propagated_to = self.do_propagate_transactions(&[(hash.clone(), transaction)]);
self.transaction_pool.on_broadcasted(propagated_to);
}
}
fn do_propagate_transactions(
&mut self,
transactions: &[(H, B::Extrinsic)],
) -> HashMap<H, Vec<String>> {
let mut propagated_to = HashMap::<_, Vec<_>>::new();
let mut propagated_transactions = 0;
for (who, peer) in self.peers.iter_mut() {
// never send transactions to the light node
if matches!(peer.role, ObservedRole::Light) {
continue
}
let (hashes, to_send): (Vec<_>, Vec<_>) = transactions
.iter()
.filter(|&(ref hash, _)| peer.known_transactions.insert(hash.clone()))
.cloned()
.unzip();
propagated_transactions += hashes.len();
if !to_send.is_empty() {
for hash in hashes {
propagated_to.entry(hash).or_default().push(who.to_base58());
}
trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who);
self.service
.write_notification(*who, self.protocol_name.clone(), to_send.encode());
}
}
if let Some(ref metrics) = self.metrics {
metrics.propagated_transactions.inc_by(propagated_transactions as _)
}
propagated_to
}
/// Call when we must propagate ready transactions to peers.
fn propagate_transactions(&mut self) {
// Accept transactions only when enabled
if !self.gossip_enabled.load(Ordering::Relaxed) {
return
}
debug!(target: "sync", "Propagating transactions");
let transactions = self.transaction_pool.transactions();
let propagated_to = self.do_propagate_transactions(&transactions);
self.transaction_pool.on_broadcasted(propagated_to);
}
}
-85
View File
@@ -1,85 +0,0 @@
// This file is part of Substrate.
// Copyright (C) 2019-2022 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use futures::{stream::unfold, FutureExt, Stream, StreamExt};
use futures_timer::Delay;
use linked_hash_set::LinkedHashSet;
use std::{hash::Hash, num::NonZeroUsize, time::Duration};
/// Creates a stream that returns a new value every `duration`.
pub fn interval(duration: Duration) -> impl Stream<Item = ()> + Unpin {
unfold((), move |_| Delay::new(duration).map(|_| Some(((), ())))).map(drop)
}
/// Wrapper around `LinkedHashSet` with bounded growth.
///
/// In the limit, for each element inserted the oldest existing element will be removed.
#[derive(Debug, Clone)]
pub struct LruHashSet<T: Hash + Eq> {
set: LinkedHashSet<T>,
limit: NonZeroUsize,
}
impl<T: Hash + Eq> LruHashSet<T> {
/// Create a new `LruHashSet` with the given (exclusive) limit.
pub fn new(limit: NonZeroUsize) -> Self {
Self { set: LinkedHashSet::new(), limit }
}
/// Insert element into the set.
///
/// Returns `true` if this is a new element to the set, `false` otherwise.
/// Maintains the limit of the set by removing the oldest entry if necessary.
/// Inserting the same element will update its LRU position.
pub fn insert(&mut self, e: T) -> bool {
if self.set.insert(e) {
if self.set.len() == usize::from(self.limit) {
self.set.pop_front(); // remove oldest entry
}
return true
}
false
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn maintains_limit() {
let three = NonZeroUsize::new(3).unwrap();
let mut set = LruHashSet::<u8>::new(three);
// First element.
assert!(set.insert(1));
assert_eq!(vec![&1], set.set.iter().collect::<Vec<_>>());
// Second element.
assert!(set.insert(2));
assert_eq!(vec![&1, &2], set.set.iter().collect::<Vec<_>>());
// Inserting the same element updates its LRU position.
assert!(!set.insert(1));
assert_eq!(vec![&2, &1], set.set.iter().collect::<Vec<_>>());
// We reached the limit. The next element forces the oldest one out.
assert!(set.insert(3));
assert_eq!(vec![&1, &3], set.set.iter().collect::<Vec<_>>());
}
}