mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-09 20:11:09 +00:00
Fix revalidation not revalidating multiple times (#5065)
This commit is contained in:
@@ -16,6 +16,7 @@
|
||||
|
||||
//! Substrate transaction pool implementation.
|
||||
|
||||
#![recursion_limit="256"]
|
||||
#![warn(missing_docs)]
|
||||
#![warn(unused_extern_crates)]
|
||||
|
||||
|
||||
@@ -113,7 +113,9 @@ async fn batch_revalidate<Api: ChainApi>(
|
||||
}
|
||||
|
||||
pool.validated_pool().remove_invalid(&invalid_hashes);
|
||||
pool.resubmit(revalidated);
|
||||
if revalidated.len() > 0 {
|
||||
pool.resubmit(revalidated);
|
||||
}
|
||||
}
|
||||
|
||||
impl<Api: ChainApi> RevalidationWorker<Api> {
|
||||
@@ -149,6 +151,7 @@ impl<Api: ChainApi> RevalidationWorker<Api> {
|
||||
} else {
|
||||
for xt in &to_queue {
|
||||
extrinsics.remove(xt);
|
||||
self.members.remove(xt);
|
||||
}
|
||||
}
|
||||
left -= to_queue.len();
|
||||
@@ -163,6 +166,10 @@ impl<Api: ChainApi> RevalidationWorker<Api> {
|
||||
queued_exts
|
||||
}
|
||||
|
||||
fn len(&self) -> usize {
|
||||
self.block_ordered.iter().map(|b| b.1.len()).sum()
|
||||
}
|
||||
|
||||
fn push(&mut self, worker_payload: WorkerPayload<Api>) {
|
||||
// we don't add something that already scheduled for revalidation
|
||||
let transactions = worker_payload.transactions;
|
||||
@@ -170,7 +177,15 @@ impl<Api: ChainApi> RevalidationWorker<Api> {
|
||||
|
||||
for ext_hash in transactions {
|
||||
// we don't add something that already scheduled for revalidation
|
||||
if self.members.contains_key(&ext_hash) { continue; }
|
||||
if self.members.contains_key(&ext_hash) {
|
||||
log::debug!(
|
||||
target: "txpool",
|
||||
"[{:?}] Skipped adding for revalidation: Already there.",
|
||||
ext_hash,
|
||||
);
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
self.block_ordered.entry(block_number)
|
||||
.and_modify(|value| { value.insert(ext_hash.clone()); })
|
||||
@@ -198,7 +213,18 @@ impl<Api: ChainApi> RevalidationWorker<Api> {
|
||||
futures::select! {
|
||||
_ = interval.next() => {
|
||||
let next_batch = this.prepare_batch();
|
||||
let batch_len = next_batch.len();
|
||||
|
||||
batch_revalidate(this.pool.clone(), this.api.clone(), this.best_block, next_batch).await;
|
||||
|
||||
if batch_len > 0 || this.len() > 0 {
|
||||
log::debug!(
|
||||
target: "txpool",
|
||||
"Revalidated {} transactions. Left in the queue for revalidation: {}.",
|
||||
batch_len,
|
||||
this.len(),
|
||||
);
|
||||
}
|
||||
},
|
||||
workload = from_queue.next() => {
|
||||
match workload {
|
||||
@@ -264,6 +290,10 @@ where
|
||||
/// If queue configured without background worker, this will resolve after
|
||||
/// revalidation is actually done.
|
||||
pub async fn revalidate_later(&self, at: NumberFor<Api>, transactions: Vec<ExHash<Api>>) {
|
||||
if transactions.len() > 0 {
|
||||
log::debug!(target: "txpool", "Added {} transactions to revalidation queue", transactions.len());
|
||||
}
|
||||
|
||||
if let Some(ref to_worker) = self.background {
|
||||
if let Err(e) = to_worker.unbounded_send(WorkerPayload { at, transactions }) {
|
||||
log::warn!(target: "txpool", "Failed to update background worker: {:?}", e);
|
||||
|
||||
@@ -267,6 +267,34 @@ fn should_not_retain_invalid_hashes_from_retracted() {
|
||||
assert_eq!(pool.status().ready, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn should_revalidate_transaction_multiple_times() {
|
||||
let xt = uxt(Alice, 209);
|
||||
|
||||
let (pool, _guard) = maintained_pool();
|
||||
|
||||
block_on(pool.submit_one(&BlockId::number(0), xt.clone())).expect("1. Imported");
|
||||
assert_eq!(pool.status().ready, 1);
|
||||
|
||||
pool.api.push_block(1, vec![xt.clone()]);
|
||||
|
||||
// maintenance is in background
|
||||
block_on(pool.maintain(block_event(1)));
|
||||
block_on(futures_timer::Delay::new(BACKGROUND_REVALIDATION_INTERVAL*2));
|
||||
|
||||
block_on(pool.submit_one(&BlockId::number(0), xt.clone())).expect("1. Imported");
|
||||
assert_eq!(pool.status().ready, 1);
|
||||
|
||||
pool.api.push_block(2, vec![]);
|
||||
pool.api.add_invalid(&xt);
|
||||
|
||||
// maintenance is in background
|
||||
block_on(pool.maintain(block_event(2)));
|
||||
block_on(futures_timer::Delay::new(BACKGROUND_REVALIDATION_INTERVAL*2));
|
||||
|
||||
assert_eq!(pool.status().ready, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn should_push_watchers_during_maintaince() {
|
||||
fn alice_uxt(nonce: u64) -> Extrinsic {
|
||||
|
||||
Reference in New Issue
Block a user