From 1628ba3388278457e85d4c9be85465a96694f33d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Wed, 4 Dec 2019 19:16:00 +0100 Subject: [PATCH] Revalidate some transactions on every block import. (#4292) * Revalidate some transactions on every block import. * Fix endless loop in revalidate_ready. * Clean up logging a bit. * More clean ups. * Print status after resubmitting. * Remove env_logger. * Remove redundant log. --- substrate/Cargo.lock | 1 - .../client/transaction-pool/graph/Cargo.toml | 1 - .../client/transaction-pool/graph/src/pool.rs | 39 +++++++++-- .../transaction-pool/graph/src/ready.rs | 4 +- .../graph/src/validated_pool.rs | 9 ++- .../client/transaction-pool/src/maintainer.rs | 66 ++++++++++++------- 6 files changed, 85 insertions(+), 35 deletions(-) diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index ba45de4d29..2b7e5dffbd 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -5568,7 +5568,6 @@ dependencies = [ "assert_matches 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "derive_more 0.99.2 (registry+https://github.com/rust-lang/crates.io-index)", - "env_logger 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "parity-scale-codec 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/substrate/client/transaction-pool/graph/Cargo.toml b/substrate/client/transaction-pool/graph/Cargo.toml index 392a39ab6d..869a206007 100644 --- a/substrate/client/transaction-pool/graph/Cargo.toml +++ b/substrate/client/transaction-pool/graph/Cargo.toml @@ -16,7 +16,6 @@ txpool-api = { package = "sp-transaction-pool-api", path = "../../../primitives/ [dev-dependencies] assert_matches = "1.3.0" -env_logger = "0.7.0" codec = { package = "parity-scale-codec", version = "1.0.0" } test_runtime = { package = "substrate-test-runtime", path = "../../../test/utils/runtime" } criterion = "0.3" diff --git a/substrate/client/transaction-pool/graph/src/pool.rs b/substrate/client/transaction-pool/graph/src/pool.rs index cf5c7a5292..d29a513d97 100644 --- a/substrate/client/transaction-pool/graph/src/pool.rs +++ b/substrate/client/transaction-pool/graph/src/pool.rs @@ -175,13 +175,39 @@ impl Pool { /// /// Returns future that performs validation of all ready transactions and /// then resubmits all transactions back to the pool. - pub fn revalidate_ready(&self, at: &BlockId) -> impl Future> { + pub fn revalidate_ready( + &self, + at: &BlockId, + max: Option, + ) -> impl Future> { + use std::time::Instant; + log::debug!(target: "txpool", + "Fetching ready transactions (up to: {})", + max.map(|x| format!("{}", x)).unwrap_or_else(|| "all".into()) + ); let validated_pool = self.validated_pool.clone(); - let ready = self.validated_pool.ready().map(|tx| tx.data.clone()); + let ready = self.validated_pool.ready() + .map(|tx| tx.data.clone()) + .take(max.unwrap_or_else(usize::max_value)); + + let now = Instant::now(); self.verify(at, ready, false) - .map(move |revalidated_transactions| revalidated_transactions.map( - move |revalidated_transactions| validated_pool.resubmit(revalidated_transactions) - )) + .map(move |revalidated_transactions| { + log::debug!(target: "txpool", + "Re-verified transactions, took {} ms. Resubmitting.", + now.elapsed().as_millis() + ); + let now = Instant::now(); + let res = revalidated_transactions.map( + |revalidated_transactions| validated_pool.resubmit(revalidated_transactions) + ); + log::debug!(target: "txpool", + "Resubmitted. Took {} ms. Status: {:?}", + now.elapsed().as_millis(), + validated_pool.status() + ); + res + }) } /// Prunes known ready transactions. @@ -927,7 +953,6 @@ mod tests { #[test] fn should_handle_pruning_in_the_middle_of_import() { - let _ = env_logger::try_init(); // given let (ready, is_ready) = std::sync::mpsc::sync_channel(0); let (tx, rx) = std::sync::mpsc::sync_channel(1); @@ -1014,7 +1039,7 @@ mod tests { pool.validated_pool.api().invalidate.lock().insert(hash3); pool.validated_pool.api().clear_requirements.lock().insert(hash1); pool.validated_pool.api().add_requirements.lock().insert(hash0); - block_on(pool.revalidate_ready(&BlockId::Number(0))).unwrap(); + block_on(pool.revalidate_ready(&BlockId::Number(0), None)).unwrap(); // then // hash0 now has unsatisfied requirements => it is moved to the future queue diff --git a/substrate/client/transaction-pool/graph/src/ready.rs b/substrate/client/transaction-pool/graph/src/ready.rs index 11354e6871..a358047dd7 100644 --- a/substrate/client/transaction-pool/graph/src/ready.rs +++ b/substrate/client/transaction-pool/graph/src/ready.rs @@ -22,7 +22,7 @@ use std::{ }; use serde::Serialize; -use log::debug; +use log::trace; use parking_lot::RwLock; use sp_runtime::traits::Member; use sp_runtime::transaction_validity::{ @@ -267,7 +267,7 @@ impl ReadyTransactions { to_remove.append(&mut tx.unlocks); // add to removed - debug!(target: "txpool", "[{:?}] Removed as invalid: ", hash); + trace!(target: "txpool", "[{:?}] Removed as part of the subtree.", hash); removed.push(tx.transaction.transaction); } } diff --git a/substrate/client/transaction-pool/graph/src/validated_pool.rs b/substrate/client/transaction-pool/graph/src/validated_pool.rs index e32dac88eb..321eed0b62 100644 --- a/substrate/client/transaction-pool/graph/src/validated_pool.rs +++ b/substrate/client/transaction-pool/graph/src/validated_pool.rs @@ -106,7 +106,12 @@ impl ValidatedPool { .map(|validated_tx| self.submit_one(validated_tx)) .collect::>(); - let removed = self.enforce_limits(); + // only enforce limits if there is at least one imported transaction + let removed = if results.iter().any(|res| res.is_ok()) { + self.enforce_limits() + } else { + Default::default() + }; results.into_iter().map(|res| match res { Ok(ref hash) if removed.contains(hash) => Err(error::Error::ImmediatelyDropped.into()), @@ -236,6 +241,8 @@ impl ValidatedPool { initial_statuses.insert(removed_hash.clone(), Status::Ready); txs_to_resubmit.push((removed_hash, tx_to_resubmit)); } + // make sure to remove the hash even if it's not present in the pool any more. + updated_transactions.remove(&hash); } // if we're rejecting future transactions, then insertion order matters here: diff --git a/substrate/client/transaction-pool/src/maintainer.rs b/substrate/client/transaction-pool/src/maintainer.rs index 7c5d07e0f1..84b780f4f3 100644 --- a/substrate/client/transaction-pool/src/maintainer.rs +++ b/substrate/client/transaction-pool/src/maintainer.rs @@ -23,7 +23,7 @@ use futures::{ Future, FutureExt, future::{Either, join, ready}, }; -use log::{warn, debug}; +use log::{warn, debug, trace}; use parking_lot::Mutex; use client_api::{ @@ -74,6 +74,11 @@ where id: &BlockId, retracted: &[Block::Hash], ) -> Box + Send + Unpin> { + let now = std::time::Instant::now(); + let took = move || format!("Took {} ms", now.elapsed().as_millis()); + + let id = *id; + trace!(target: "txpool", "[{:?}] Starting pool maintainance", id); // Put transactions from retracted blocks back into the pool. let client_copy = self.client.clone(); let retracted_transactions = retracted.to_vec().into_iter() @@ -82,13 +87,14 @@ where // if signed information is not present, attempt to resubmit anyway. .filter(|tx| tx.is_signed().unwrap_or(true)); let resubmit_future = self.pool - .submit_at(id, retracted_transactions, true) - .then(|resubmit_result| ready(match resubmit_result { - Ok(_) => (), - Err(e) => { - debug!(target: "txpool", "Error re-submitting transactions: {:?}", e); - () - } + .submit_at(&id, retracted_transactions, true) + .then(move |resubmit_result| ready(match resubmit_result { + Ok(_) => trace!(target: "txpool", + "[{:?}] Re-submitting retracted done. {}", id, took() + ), + Err(e) => debug!(target: "txpool", + "[{:?}] Error re-submitting transactions: {:?}", id, e + ), })); // Avoid calling into runtime if there is nothing to prune from the pool anyway. @@ -96,28 +102,42 @@ where return Box::new(resubmit_future) } - let block = (self.client.header(*id), self.client.block_body(id)); - match block { + let block = (self.client.header(id), self.client.block_body(&id)); + let prune_future = match block { (Ok(Some(header)), Ok(Some(extrinsics))) => { let parent_id = BlockId::hash(*header.parent_hash()); let prune_future = self.pool - .prune(id, &parent_id, &extrinsics) - .then(|prune_result| ready(match prune_result { - Ok(_) => (), - Err(e) => { - warn!("Error pruning transactions: {:?}", e); - () - } + .prune(&id, &parent_id, &extrinsics) + .then(move |prune_result| ready(match prune_result { + Ok(_) => trace!(target: "txpool", + "[{:?}] Pruning done. {}", id, took() + ), + Err(e) => warn!(target: "txpool", + "[{:?}] Error pruning transactions: {:?}", id, e + ), })); - Box::new(resubmit_future.then(|_| prune_future)) + Either::Left(resubmit_future.then(|_| prune_future)) }, - (Ok(_), Ok(_)) => Box::new(resubmit_future), + (Ok(_), Ok(_)) => Either::Right(resubmit_future), err => { - warn!("Error reading block: {:?}", err); - Box::new(resubmit_future) + warn!(target: "txpool", "[{:?}] Error reading block: {:?}", id, err); + Either::Right(resubmit_future) }, - } + }; + + let revalidate_future = self.pool + .revalidate_ready(&id, Some(16)) + .then(move |result| ready(match result { + Ok(_) => debug!(target: "txpool", + "[{:?}] Revalidation done: {}", id, took() + ), + Err(e) => warn!(target: "txpool", + "[{:?}] Encountered errors while revalidating transactions: {:?}", id, e + ), + })); + + Box::new(prune_future.then(|_| revalidate_future)) } } @@ -228,7 +248,7 @@ impl LightBasicPoolMaintainer { let revalidation_status = self.revalidation_status.clone(); Either::Left(self.pool - .revalidate_ready(id) + .revalidate_ready(id, None) .map(|r| r.map_err(|e| warn!("Error revalidating known transactions: {}", e))) .map(move |_| revalidation_status.lock().clear())) },