mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-01 12:27:56 +00:00
Fix transaction pool & network issues (#6288)
* fix & tweaks * address review * line width
This commit is contained in:
@@ -78,7 +78,9 @@ const PROPAGATE_TIMEOUT: time::Duration = time::Duration::from_millis(2900);
|
||||
/// Maximim number of known block hashes to keep for a peer.
|
||||
const MAX_KNOWN_BLOCKS: usize = 1024; // ~32kb per peer + LruHashSet overhead
|
||||
/// Maximim number of known extrinsic hashes to keep for a peer.
|
||||
const MAX_KNOWN_EXTRINSICS: usize = 4096; // ~128kb per peer + overhead
|
||||
///
|
||||
/// This should be approx. 2 blocks full of transactions for the network to function properly.
|
||||
const MAX_KNOWN_EXTRINSICS: usize = 10240; // ~300kb per peer + overhead.
|
||||
|
||||
/// Maximim number of transaction validation request we keep at any moment.
|
||||
const MAX_PENDING_TRANSACTIONS: usize = 8192;
|
||||
|
||||
@@ -34,8 +34,8 @@ pub mod testing;
|
||||
pub use sc_transaction_graph as txpool;
|
||||
pub use crate::api::{FullChainApi, LightChainApi};
|
||||
|
||||
use std::{collections::HashMap, sync::Arc, pin::Pin};
|
||||
use futures::{prelude::*, future::{ready, self}, channel::oneshot};
|
||||
use std::{collections::{HashMap, HashSet}, sync::Arc, pin::Pin};
|
||||
use futures::{prelude::*, future::{self, ready}, channel::oneshot};
|
||||
use parking_lot::Mutex;
|
||||
|
||||
use sp_runtime::{
|
||||
@@ -47,7 +47,7 @@ use sp_transaction_pool::{
|
||||
TransactionStatusStreamFor, MaintainedTransactionPool, PoolFuture, ChainEvent,
|
||||
TransactionSource,
|
||||
};
|
||||
use sc_transaction_graph::ChainApi;
|
||||
use sc_transaction_graph::{ChainApi, ExtrinsicHash};
|
||||
use wasm_timer::Instant;
|
||||
|
||||
use prometheus_endpoint::Registry as PrometheusRegistry;
|
||||
@@ -440,10 +440,10 @@ async fn prune_known_txs_for_block<Block: BlockT, Api: ChainApi<Block = Block>>(
|
||||
block_id: BlockId<Block>,
|
||||
api: &Api,
|
||||
pool: &sc_transaction_graph::Pool<Api>,
|
||||
) {
|
||||
) -> Vec<ExtrinsicHash<Api>> {
|
||||
// We don't query block if we won't prune anything
|
||||
if pool.validated_pool().status().is_empty() {
|
||||
return;
|
||||
return Vec::new();
|
||||
}
|
||||
|
||||
let hashes = api.block_body(&block_id).await
|
||||
@@ -459,6 +459,8 @@ async fn prune_known_txs_for_block<Block: BlockT, Api: ChainApi<Block = Block>>(
|
||||
if let Err(e) = pool.prune_known(&block_id, &hashes) {
|
||||
log::error!("Cannot prune known in the pool {:?}!", e);
|
||||
}
|
||||
|
||||
hashes
|
||||
}
|
||||
|
||||
impl<PoolApi, Block> MaintainedTransactionPool for BasicPool<PoolApi, Block>
|
||||
@@ -495,6 +497,10 @@ impl<PoolApi, Block> MaintainedTransactionPool for BasicPool<PoolApi, Block>
|
||||
let ready_poll = self.ready_poll.clone();
|
||||
|
||||
async move {
|
||||
// We keep track of everything we prune so that later we won't add
|
||||
// tranactions with those hashes from the retracted blocks.
|
||||
let mut pruned_log = HashSet::<ExtrinsicHash<PoolApi>>::new();
|
||||
|
||||
// If there is a tree route, we use this to prune known tx based on the enacted
|
||||
// blocks.
|
||||
if let Some(ref tree_route) = tree_route {
|
||||
@@ -509,12 +515,14 @@ impl<PoolApi, Block> MaintainedTransactionPool for BasicPool<PoolApi, Block>
|
||||
&*pool,
|
||||
),
|
||||
),
|
||||
).await;
|
||||
).await.into_iter().for_each(|enacted_log|{
|
||||
pruned_log.extend(enacted_log);
|
||||
})
|
||||
}
|
||||
|
||||
// If this is a new best block, we need to prune its transactions from the pool.
|
||||
if is_new_best {
|
||||
prune_known_txs_for_block(id.clone(), &*api, &*pool).await;
|
||||
pruned_log.extend(prune_known_txs_for_block(id.clone(), &*api, &*pool).await);
|
||||
}
|
||||
|
||||
if let (true, Some(tree_route)) = (next_action.resubmit, tree_route) {
|
||||
@@ -536,7 +544,21 @@ impl<PoolApi, Block> MaintainedTransactionPool for BasicPool<PoolApi, Block>
|
||||
.into_iter()
|
||||
.filter(|tx| tx.is_signed().unwrap_or(true));
|
||||
|
||||
resubmit_transactions.extend(block_transactions);
|
||||
resubmit_transactions.extend(
|
||||
block_transactions.into_iter().filter(|tx| {
|
||||
let tx_hash = pool.hash_of(&tx);
|
||||
let contains = pruned_log.contains(&tx_hash);
|
||||
if !contains {
|
||||
log::debug!(
|
||||
target: "txpool",
|
||||
"[{:?}]: Resubmitting from retracted block {:?}",
|
||||
tx_hash,
|
||||
hash,
|
||||
);
|
||||
}
|
||||
!contains
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
if let Err(e) = pool.submit_at(
|
||||
|
||||
@@ -281,6 +281,25 @@ fn should_resubmit_from_retracted_during_maintenance() {
|
||||
assert_eq!(pool.status().ready, 1);
|
||||
}
|
||||
|
||||
|
||||
#[test]
|
||||
fn should_not_resubmit_from_retracted_during_maintenance_if_tx_is_also_in_enacted() {
|
||||
let xt = uxt(Alice, 209);
|
||||
|
||||
let (pool, _guard, _notifier) = maintained_pool();
|
||||
|
||||
block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt.clone())).expect("1. Imported");
|
||||
assert_eq!(pool.status().ready, 1);
|
||||
|
||||
let header = pool.api.push_block(1, vec![xt.clone()]);
|
||||
let fork_header = pool.api.push_block(1, vec![xt]);
|
||||
|
||||
let event = block_event_with_retracted(header, fork_header.hash(), &*pool.api);
|
||||
|
||||
block_on(pool.maintain(event));
|
||||
assert_eq!(pool.status().ready, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn should_not_retain_invalid_hashes_from_retracted() {
|
||||
let xt = uxt(Alice, 209);
|
||||
|
||||
Reference in New Issue
Block a user