Improve transaction submission (#6599)

* Improve transaction submission

Before this pr the transaction pool validated each transaction, even if
the transaction was already known to the pool. This pr changes the
behavior to first check if we are already aware of a transaction and
thus, to only validate them if we don't know them yet. However, there is
still the possibility that a given transaction is validated multiple
times. This can happen if the transaction is added the first time, but
is not yet validated and added to the validated pool.

Besides that, this pr fixes the wrong metrics of gossiped transactions
in the network. It also moves some metrics to the transaction pool api,
to better track when a transaction actually is scheduled for validation.

* Make sure we don't submit the same transaction twice from the network concurrently

* Remove added listener call

* Feedback

* Ignore banned on resubmit
This commit is contained in:
Bastian Köcher
2020-07-08 17:42:42 +02:00
committed by GitHub
parent faa72caf91
commit 94cddee160
16 changed files with 226 additions and 122 deletions
+46 -23
View File
@@ -51,7 +51,7 @@ use message::generic::{Message as GenericMessage, ConsensusMessage, Roles};
use prometheus_endpoint::{Registry, Gauge, Counter, GaugeVec, HistogramVec, PrometheusError, Opts, register, U64};
use sync::{ChainSync, SyncState};
use std::borrow::Cow;
use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
use std::collections::{BTreeMap, HashMap, HashSet, VecDeque, hash_map::Entry};
use std::sync::Arc;
use std::fmt::Write;
use std::{cmp, io, num::NonZeroUsize, pin::Pin, task::Poll, time};
@@ -199,18 +199,21 @@ impl Metrics {
}
}
struct PendingTransaction {
#[pin_project::pin_project]
struct PendingTransaction<H> {
#[pin]
validation: TransactionImportFuture,
peer_id: PeerId,
tx_hash: H,
}
impl Future for PendingTransaction {
type Output = (PeerId, TransactionImport);
impl<H: ExHashT> Future for PendingTransaction<H> {
type Output = (H, TransactionImport);
fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let this = Pin::into_inner(self);
if let Poll::Ready(import_result) = this.validation.poll_unpin(cx) {
return Poll::Ready((this.peer_id.clone(), import_result));
let mut this = self.project();
if let Poll::Ready(import_result) = Pin::new(&mut this.validation).poll_unpin(cx) {
return Poll::Ready((this.tx_hash.clone(), import_result));
}
Poll::Pending
@@ -226,7 +229,12 @@ pub struct Protocol<B: BlockT, H: ExHashT> {
/// Pending list of messages to return from `poll` as a priority.
pending_messages: VecDeque<CustomMessageOutcome<B>>,
/// Pending transactions verification tasks.
pending_transactions: FuturesUnordered<PendingTransaction>,
pending_transactions: FuturesUnordered<PendingTransaction<H>>,
/// As multiple peers can send us the same transaction, we group
/// these peers using the transaction hash while the transaction is
/// imported. This prevents that we import the same transaction
/// multiple times concurrently.
pending_transactions_peers: HashMap<H, Vec<PeerId>>,
config: ProtocolConfig,
genesis_hash: B::Hash,
sync: ChainSync<B>,
@@ -452,6 +460,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
propagate_timeout: Box::pin(interval(PROPAGATE_TIMEOUT)),
pending_messages: VecDeque::new(),
pending_transactions: FuturesUnordered::new(),
pending_transactions_peers: HashMap::new(),
config,
context_data: ContextData {
peers: HashMap::new(),
@@ -1162,7 +1171,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
fn on_transactions(
&mut self,
who: PeerId,
transactions: message::Transactions<B::Extrinsic>
transactions: message::Transactions<B::Extrinsic>,
) {
// sending transaction to light node is considered a bad behavior
if !self.config.roles.is_full() {
@@ -1191,14 +1200,22 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
}
let hash = self.transaction_pool.hash_of(&t);
peer.known_transactions.insert(hash);
peer.known_transactions.insert(hash.clone());
self.peerset_handle.report_peer(who.clone(), rep::ANY_TRANSACTION);
self.pending_transactions.push(PendingTransaction {
peer_id: who.clone(),
validation: self.transaction_pool.import(t),
});
match self.pending_transactions_peers.entry(hash.clone()) {
Entry::Vacant(entry) => {
self.pending_transactions.push(PendingTransaction {
validation: self.transaction_pool.import(t),
tx_hash: hash,
});
entry.insert(vec![who.clone()]);
},
Entry::Occupied(mut entry) => {
entry.get_mut().push(who.clone());
}
}
}
}
}
@@ -1232,7 +1249,9 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
&mut self,
transactions: &[(H, B::Extrinsic)],
) -> HashMap<H, Vec<String>> {
let mut propagated_to = HashMap::new();
let mut propagated_to = HashMap::<_, Vec<_>>::new();
let mut propagated_transactions = 0;
for (who, peer) in self.context_data.peers.iter_mut() {
// never send transactions to the light node
if !peer.info.roles.is_full() {
@@ -1245,11 +1264,13 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
.cloned()
.unzip();
propagated_transactions += hashes.len();
if !to_send.is_empty() {
for hash in hashes {
propagated_to
.entry(hash)
.or_insert_with(Vec::new)
.or_default()
.push(who.to_base58());
}
trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who);
@@ -1264,10 +1285,8 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
}
}
if propagated_to.len() > 0 {
if let Some(ref metrics) = self.metrics {
metrics.propagated_transactions.inc();
}
if let Some(ref metrics) = self.metrics {
metrics.propagated_transactions.inc_by(propagated_transactions as _)
}
propagated_to
@@ -2017,8 +2036,12 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
};
self.pending_messages.push_back(event);
}
if let Poll::Ready(Some((peer_id, result))) = self.pending_transactions.poll_next_unpin(cx) {
self.on_handle_transaction_import(peer_id, result);
if let Poll::Ready(Some((tx_hash, result))) = self.pending_transactions.poll_next_unpin(cx) {
if let Some(peers) = self.pending_transactions_peers.remove(&tx_hash) {
peers.into_iter().for_each(|p| self.on_handle_transaction_import(p, result));
} else {
warn!(target: "sub-libp2p", "Inconsistent state, no peers for pending transaction!");
}
}
if let Some(message) = self.pending_messages.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(message));