From 679c91e18c702e921372c0d41d781a901aac5504 Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Mon, 11 May 2020 11:36:00 +0300 Subject: [PATCH] Refactor network transactions handling (#5939) * change propagation * add bound --- substrate/client/network/src/config.rs | 36 +++++++---- substrate/client/network/src/protocol.rs | 76 ++++++++++++++++++++---- substrate/client/service/src/builder.rs | 1 - substrate/client/service/src/lib.rs | 68 +++++++++++---------- 4 files changed, 122 insertions(+), 59 deletions(-) diff --git a/substrate/client/network/src/config.rs b/substrate/client/network/src/config.rs index 66800aeeaf..9a979ac16d 100644 --- a/substrate/client/network/src/config.rs +++ b/substrate/client/network/src/config.rs @@ -28,14 +28,14 @@ pub use libp2p::{identity, core::PublicKey, wasm_ext::ExtTransport, build_multia #[doc(hidden)] pub use crate::protocol::ProtocolConfig; -use crate::{ExHashT, ReportHandle}; +use crate::ExHashT; use core::{fmt, iter}; +use futures::future; use libp2p::identity::{ed25519, Keypair}; use libp2p::wasm_ext; use libp2p::{multiaddr, Multiaddr, PeerId}; use prometheus_endpoint::Registry; -use sc_peerset::ReputationChange; use sp_consensus::{block_validation::BlockAnnounceValidator, import_queue::ImportQueue}; use sp_runtime::{traits::Block as BlockT, ConsensusEngineId}; use std::{borrow::Cow, convert::TryFrom, future::Future, pin::Pin, str::FromStr}; @@ -167,6 +167,22 @@ impl FinalityProofRequestBuilder for DummyFinalityProofRequestBuil /// Shared finality proof request builder struct used by the queue. pub type BoxFinalityProofRequestBuilder = Box + Send + Sync>; +/// Result of the transaction import. +#[derive(Clone, Copy, Debug)] +pub enum TransactionImport { + /// Transaction is good but already known by the transaction pool. + KnownGood, + /// Transaction is good and not yet known. + NewGood, + /// Transaction is invalid. + Bad, + /// Transaction import was not performed. + None, +} + +/// Fuure resolving to transaction import result. +pub type TransactionImportFuture = Pin + Send>>; + /// Transaction pool interface pub trait TransactionPool: Send + Sync { /// Get transactions from the pool that are ready to be propagated. @@ -175,15 +191,11 @@ pub trait TransactionPool: Send + Sync { fn hash_of(&self, transaction: &B::Extrinsic) -> H; /// Import a transaction into the pool. /// - /// Peer reputation is changed by reputation_change if transaction is accepted by the pool. + /// This will return future. fn import( &self, - report_handle: ReportHandle, - who: PeerId, - reputation_change_good: ReputationChange, - reputation_change_bad: ReputationChange, transaction: B::Extrinsic, - ); + ) -> TransactionImportFuture; /// Notify the pool about transactions broadcast. fn on_broadcasted(&self, propagations: HashMap>); /// Get transaction by hash. @@ -209,12 +221,10 @@ impl TransactionPool for EmptyTransaction fn import( &self, - _report_handle: ReportHandle, - _who: PeerId, - _rep_change_good: ReputationChange, - _rep_change_bad: ReputationChange, _transaction: B::Extrinsic - ) {} + ) -> TransactionImportFuture { + Box::pin(future::ready(TransactionImport::KnownGood)) + } fn on_broadcasted(&self, _: HashMap>) {} diff --git a/substrate/client/network/src/protocol.rs b/substrate/client/network/src/protocol.rs index 895624f08d..56383604f8 100644 --- a/substrate/client/network/src/protocol.rs +++ b/substrate/client/network/src/protocol.rs @@ -17,13 +17,13 @@ use crate::{ ExHashT, chain::{Client, FinalityProofProvider}, - config::{BoxFinalityProofRequestBuilder, ProtocolId, TransactionPool}, + config::{BoxFinalityProofRequestBuilder, ProtocolId, TransactionPool, TransactionImportFuture, TransactionImport}, error, utils::interval }; use bytes::{Bytes, BytesMut}; -use futures::prelude::*; +use futures::{prelude::*, stream::FuturesUnordered}; use generic_proto::{GenericProto, GenericProtoOut}; use libp2p::{Multiaddr, PeerId}; use libp2p::core::{ConnectedPoint, connection::{ConnectionId, ListenerId}}; @@ -78,6 +78,9 @@ const MAX_KNOWN_BLOCKS: usize = 1024; // ~32kb per peer + LruHashSet overhead /// Maximim number of known extrinsic hashes to keep for a peer. const MAX_KNOWN_EXTRINSICS: usize = 4096; // ~128kb per peer + overhead +/// Maximim number of transaction validation request we keep at any moment. +const MAX_PENDING_TRANSACTIONS: usize = 8192; + /// Current protocol version. pub(crate) const CURRENT_VERSION: u32 = 6; /// Lowest version we support @@ -101,6 +104,13 @@ mod rep { pub const UNEXPECTED_STATUS: Rep = Rep::new(-(1 << 20), "Unexpected status message"); /// Reputation change when we are a light client and a peer is behind us. pub const PEER_BEHIND_US_LIGHT: Rep = Rep::new(-(1 << 8), "Useless for a light peer"); + /// Reputation change when a peer sends us any extrinsic. + /// + /// This forces node to verify it, thus the negative value here. Once extrinsic is verified, + /// reputation change should be refunded with `ANY_EXTRINSIC_REFUND` + pub const ANY_EXTRINSIC: Rep = Rep::new(-(1 << 4), "Any extrinsic"); + /// Reputation change when a peer sends us any extrinsic that is not invalid. + pub const ANY_EXTRINSIC_REFUND: Rep = Rep::new(1 << 4, "Any extrinsic (refund)"); /// Reputation change when a peer sends us an extrinsic that we didn't know about. pub const GOOD_EXTRINSIC: Rep = Rep::new(1 << 7, "Good extrinsic"); /// Reputation change when a peer sends us a bad extrinsic. @@ -182,6 +192,24 @@ impl Metrics { } } +struct PendingTransaction { + validation: TransactionImportFuture, + peer_id: PeerId, +} + +impl Future for PendingTransaction { + type Output = (PeerId, TransactionImport); + + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + let this = Pin::into_inner(self); + if let Poll::Ready(import_result) = this.validation.poll_unpin(cx) { + return Poll::Ready((this.peer_id.clone(), import_result)); + } + + Poll::Pending + } +} + // Lock must always be taken in order declared here. pub struct Protocol { /// Interval at which we call `tick`. @@ -190,6 +218,8 @@ pub struct Protocol { propagate_timeout: Pin + Send>>, /// Pending list of messages to return from `poll` as a priority. pending_messages: VecDeque>, + /// Pending extrinsic verification tasks. + pending_transactions: FuturesUnordered, config: ProtocolConfig, genesis_hash: B::Hash, sync: ChainSync, @@ -394,6 +424,7 @@ impl Protocol { tick_timeout: Box::pin(interval(TICK_TIMEOUT)), propagate_timeout: Box::pin(interval(PROPAGATE_TIMEOUT)), pending_messages: VecDeque::new(), + pending_transactions: FuturesUnordered::new(), config, context_data: ContextData { peers: HashMap::new(), @@ -1118,20 +1149,37 @@ impl Protocol { trace!(target: "sync", "Received {} extrinsics from {}", extrinsics.len(), who); if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) { for t in extrinsics { + if self.pending_transactions.len() > MAX_PENDING_TRANSACTIONS { + debug!( + target: "sync", + "Ignoring any further transactions that exceed `MAX_PENDING_TRANSACTIONS`({}) limit", + MAX_PENDING_TRANSACTIONS, + ); + break; + } + let hash = self.transaction_pool.hash_of(&t); peer.known_extrinsics.insert(hash); - self.transaction_pool.import( - self.peerset_handle.clone().into(), - who.clone(), - rep::GOOD_EXTRINSIC, - rep::BAD_EXTRINSIC, - t, - ); + self.peerset_handle.report_peer(who.clone(), rep::ANY_EXTRINSIC); + + self.pending_transactions.push(PendingTransaction { + peer_id: who.clone(), + validation: self.transaction_pool.import(t), + }); } } } + fn on_handle_extrinsic_import(&mut self, who: PeerId, import: TransactionImport) { + match import { + TransactionImport::KnownGood => self.peerset_handle.report_peer(who, rep::ANY_EXTRINSIC_REFUND), + TransactionImport::NewGood => self.peerset_handle.report_peer(who, rep::GOOD_EXTRINSIC), + TransactionImport::Bad => self.peerset_handle.report_peer(who, rep::BAD_EXTRINSIC), + TransactionImport::None => {}, + } + } + /// Propagate one extrinsic. pub fn propagate_extrinsic( &mut self, @@ -1953,7 +2001,7 @@ impl NetworkBehaviour for Protocol { &mut self.context_data.stats, &mut self.context_data.peers, &id, - GenericMessage::BlockRequest(r) + GenericMessage::BlockRequest(r), ) } } @@ -1970,7 +2018,7 @@ impl NetworkBehaviour for Protocol { &mut self.context_data.stats, &mut self.context_data.peers, &id, - GenericMessage::BlockRequest(r) + GenericMessage::BlockRequest(r), ) } } @@ -1988,9 +2036,13 @@ impl NetworkBehaviour for Protocol { &mut self.context_data.stats, &mut self.context_data.peers, &id, - GenericMessage::FinalityProofRequest(r)) + GenericMessage::FinalityProofRequest(r), + ) } } + if let Poll::Ready(Some((peer_id, result))) = self.pending_transactions.poll_next_unpin(cx) { + self.on_handle_extrinsic_import(peer_id, result); + } if let Some(message) = self.pending_messages.pop_front() { return Poll::Ready(NetworkBehaviourAction::GenerateEvent(message)); } diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs index 68abf8e3cd..eb6e040cc4 100644 --- a/substrate/client/service/src/builder.rs +++ b/substrate/client/service/src/builder.rs @@ -883,7 +883,6 @@ ServiceBuilder< imports_external_transactions: !matches!(config.role, Role::Light), pool: transaction_pool.clone(), client: client.clone(), - executor: task_manager.spawn_handle(), }); let protocol_id = { diff --git a/substrate/client/service/src/lib.rs b/substrate/client/service/src/lib.rs index 1c0f8ced74..ad42a2b39f 100644 --- a/substrate/client/service/src/lib.rs +++ b/substrate/client/service/src/lib.rs @@ -50,7 +50,7 @@ use futures::{ sink::SinkExt, task::{Spawn, FutureObj, SpawnError}, }; -use sc_network::{NetworkService, network_state::NetworkState, PeerId, ReportHandle}; +use sc_network::{NetworkService, network_state::NetworkState, PeerId}; use log::{log, warn, debug, error, Level}; use codec::{Encode, Decode}; use sp_runtime::generic::BlockId; @@ -76,7 +76,10 @@ pub use sc_executor::NativeExecutionDispatch; #[doc(hidden)] pub use std::{ops::Deref, result::Result, sync::Arc}; #[doc(hidden)] -pub use sc_network::config::{FinalityProofProvider, OnDemand, BoxFinalityProofRequestBuilder}; +pub use sc_network::config::{ + FinalityProofProvider, OnDemand, BoxFinalityProofRequestBuilder, TransactionImport, + TransactionImportFuture, +}; pub use sc_tracing::TracingReceiver; pub use task_manager::SpawnTaskHandle; use task_manager::TaskManager; @@ -616,7 +619,6 @@ pub struct TransactionPoolAdapter { imports_external_transactions: bool, pool: Arc

, client: Arc, - executor: SpawnTaskHandle, } /// Get transactions for propagation. @@ -659,42 +661,42 @@ where fn import( &self, - report_handle: ReportHandle, - who: PeerId, - reputation_change_good: sc_network::ReputationChange, - reputation_change_bad: sc_network::ReputationChange, - transaction: B::Extrinsic - ) { + transaction: B::Extrinsic, + ) -> TransactionImportFuture { if !self.imports_external_transactions { debug!("Transaction rejected"); - return; + Box::pin(futures::future::ready(TransactionImport::None)); } let encoded = transaction.encode(); - match Decode::decode(&mut &encoded[..]) { - Ok(uxt) => { - let best_block_id = BlockId::hash(self.client.info().best_hash); - let source = sp_transaction_pool::TransactionSource::External; - let import_future = self.pool.submit_one(&best_block_id, source, uxt); - let import_future = import_future - .map(move |import_result| { - match import_result { - Ok(_) => report_handle.report_peer(who, reputation_change_good), - Err(e) => match e.into_pool_error() { - Ok(sp_transaction_pool::error::Error::AlreadyImported(_)) => (), - Ok(e) => { - report_handle.report_peer(who, reputation_change_bad); - debug!("Error adding transaction to the pool: {:?}", e) - } - Err(e) => debug!("Error converting pool error: {:?}", e), - } - } - }); - - self.executor.spawn("extrinsic-import", import_future); + let uxt = match Decode::decode(&mut &encoded[..]) { + Ok(uxt) => uxt, + Err(e) => { + debug!("Transaction invalid: {:?}", e); + return Box::pin(futures::future::ready(TransactionImport::Bad)); } - Err(e) => debug!("Error decoding transaction {}", e), - } + }; + + let best_block_id = BlockId::hash(self.client.info().best_hash); + + let import_future = self.pool.submit_one(&best_block_id, sp_transaction_pool::TransactionSource::External, uxt); + Box::pin(async move { + match import_future.await { + Ok(_) => TransactionImport::NewGood, + Err(e) => match e.into_pool_error() { + Ok(sp_transaction_pool::error::Error::AlreadyImported(_)) => TransactionImport::KnownGood, + Ok(e) => { + debug!("Error adding transaction to the pool: {:?}", e); + TransactionImport::Bad + } + Err(e) => { + debug!("Error converting pool error: {:?}", e); + // it is not bad at least, just some internal node logic error, so peer is innocent. + TransactionImport::KnownGood + } + } + } + }) } fn on_broadcasted(&self, propagations: HashMap>) {