mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-06 06:48:01 +00:00
Improve logging for transaction pool (#6152)
* improve logging * Update client/transaction-pool/graph/src/validated_pool.rs Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com> * address review and make uniform Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>
This commit is contained in:
@@ -18,7 +18,7 @@
|
||||
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
use std::{
|
||||
collections::HashMap, hash,
|
||||
collections::HashMap, hash, fmt::Debug,
|
||||
};
|
||||
use linked_hash_map::LinkedHashMap;
|
||||
use serde::Serialize;
|
||||
@@ -27,7 +27,7 @@ use log::{debug, trace, warn};
|
||||
use sp_runtime::traits;
|
||||
|
||||
/// Extrinsic pool default listener.
|
||||
pub struct Listener<H: hash::Hash + Eq, C: ChainApi> {
|
||||
pub struct Listener<H: hash::Hash + Eq + Debug, C: ChainApi> {
|
||||
watchers: HashMap<H, watcher::Sender<H, BlockHash<C>>>,
|
||||
finality_watchers: LinkedHashMap<BlockHash<C>, Vec<H>>,
|
||||
}
|
||||
@@ -35,7 +35,7 @@ pub struct Listener<H: hash::Hash + Eq, C: ChainApi> {
|
||||
/// Maximum number of blocks awaiting finality at any time.
|
||||
const MAX_FINALITY_WATCHERS: usize = 512;
|
||||
|
||||
impl<H: hash::Hash + Eq, C: ChainApi> Default for Listener<H, C> {
|
||||
impl<H: hash::Hash + Eq + Debug, C: ChainApi> Default for Listener<H, C> {
|
||||
fn default() -> Self {
|
||||
Listener {
|
||||
watchers: Default::default(),
|
||||
@@ -74,7 +74,7 @@ impl<H: hash::Hash + traits::Member + Serialize, C: ChainApi> Listener<H, C> {
|
||||
|
||||
/// New transaction was added to the ready pool or promoted from the future pool.
|
||||
pub fn ready(&mut self, tx: &H, old: Option<&H>) {
|
||||
trace!(target: "txpool", "[{:?}] Ready (replaced: {:?})", tx, old);
|
||||
trace!(target: "txpool", "[{:?}] Ready (replaced with {:?})", tx, old);
|
||||
self.fire(tx, |watcher| watcher.ready());
|
||||
if let Some(old) = old {
|
||||
self.fire(old, |watcher| watcher.usurped(tx.clone()));
|
||||
@@ -89,7 +89,7 @@ impl<H: hash::Hash + traits::Member + Serialize, C: ChainApi> Listener<H, C> {
|
||||
|
||||
/// Transaction was dropped from the pool because of the limit.
|
||||
pub fn dropped(&mut self, tx: &H, by: Option<&H>) {
|
||||
trace!(target: "txpool", "[{:?}] Dropped (replaced by {:?})", tx, by);
|
||||
trace!(target: "txpool", "[{:?}] Dropped (replaced with {:?})", tx, by);
|
||||
self.fire(tx, |watcher| match by {
|
||||
Some(t) => watcher.usurped(t.clone()),
|
||||
None => watcher.dropped(),
|
||||
@@ -99,9 +99,9 @@ impl<H: hash::Hash + traits::Member + Serialize, C: ChainApi> Listener<H, C> {
|
||||
/// Transaction was removed as invalid.
|
||||
pub fn invalid(&mut self, tx: &H, warn: bool) {
|
||||
if warn {
|
||||
warn!(target: "txpool", "Extrinsic invalid: {:?}", tx);
|
||||
warn!(target: "txpool", "[{:?}] Extrinsic invalid", tx);
|
||||
} else {
|
||||
debug!(target: "txpool", "Extrinsic invalid: {:?}", tx);
|
||||
debug!(target: "txpool", "[{:?}] Extrinsic invalid", tx);
|
||||
}
|
||||
self.fire(tx, |watcher| watcher.invalid());
|
||||
}
|
||||
@@ -134,6 +134,7 @@ impl<H: hash::Hash + traits::Member + Serialize, C: ChainApi> Listener<H, C> {
|
||||
pub fn finalized(&mut self, block_hash: BlockHash<C>) {
|
||||
if let Some(hashes) = self.finality_watchers.remove(&block_hash) {
|
||||
for hash in hashes {
|
||||
log::debug!(target: "txpool", "[{:?}] Sent finalization event (block {:?})", hash, block_hash);
|
||||
self.fire(&hash, |s| s.finalized(block_hash))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,7 +27,6 @@ use crate::listener::Listener;
|
||||
use crate::rotator::PoolRotator;
|
||||
use crate::watcher::Watcher;
|
||||
use serde::Serialize;
|
||||
use log::{debug, warn};
|
||||
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use sp_runtime::{
|
||||
@@ -189,11 +188,11 @@ impl<B: ChainApi> ValidatedPool<B> {
|
||||
let ready_limit = &self.options.ready;
|
||||
let future_limit = &self.options.future;
|
||||
|
||||
debug!(target: "txpool", "Pool Status: {:?}", status);
|
||||
log::debug!(target: "txpool", "Pool Status: {:?}", status);
|
||||
if ready_limit.is_exceeded(status.ready, status.ready_bytes)
|
||||
|| future_limit.is_exceeded(status.future, status.future_bytes)
|
||||
{
|
||||
debug!(
|
||||
log::debug!(
|
||||
target: "txpool",
|
||||
"Enforcing limits ({}/{}kB ready, {}/{}kB future",
|
||||
ready_limit.count, ready_limit.total_bytes / 1024,
|
||||
@@ -209,8 +208,11 @@ impl<B: ChainApi> ValidatedPool<B> {
|
||||
self.rotator.ban(&Instant::now(), removed.iter().map(|x| x.clone()));
|
||||
removed
|
||||
};
|
||||
if !removed.is_empty() {
|
||||
log::debug!(target: "txpool", "Enforcing limits: {} dropped", removed.len());
|
||||
}
|
||||
|
||||
// run notifications
|
||||
debug!(target: "txpool", "Enforcing limits: {} dropped", removed.len());
|
||||
let mut listener = self.listener.write();
|
||||
for h in &removed {
|
||||
listener.dropped(h, None);
|
||||
@@ -324,7 +326,7 @@ impl<B: ChainApi> ValidatedPool<B> {
|
||||
// we do not want to fail if single transaction import has failed
|
||||
// nor we do want to propagate this error, because it could tx unknown to caller
|
||||
// => let's just notify listeners (and issue debug message)
|
||||
warn!(
|
||||
log::warn!(
|
||||
target: "txpool",
|
||||
"[{:?}] Removing invalid transaction from update: {}",
|
||||
hash,
|
||||
@@ -531,14 +533,14 @@ impl<B: ChainApi> ValidatedPool<B> {
|
||||
return vec![];
|
||||
}
|
||||
|
||||
debug!(target: "txpool", "Removing invalid transactions: {:?}", hashes);
|
||||
log::debug!(target: "txpool", "Removing invalid transactions: {:?}", hashes);
|
||||
|
||||
// temporarily ban invalid transactions
|
||||
self.rotator.ban(&Instant::now(), hashes.iter().cloned());
|
||||
|
||||
let invalid = self.pool.write().remove_subtree(hashes);
|
||||
|
||||
debug!(target: "txpool", "Removed invalid transactions: {:?}", invalid);
|
||||
log::debug!(target: "txpool", "Removed invalid transactions: {:?}", invalid);
|
||||
|
||||
let mut listener = self.listener.write();
|
||||
for tx in &invalid {
|
||||
@@ -560,7 +562,7 @@ impl<B: ChainApi> ValidatedPool<B> {
|
||||
|
||||
/// Notify all watchers that transactions in the block with hash have been finalized
|
||||
pub async fn on_block_finalized(&self, block_hash: BlockHash<B>) -> Result<(), B::Error> {
|
||||
debug!(target: "txpool", "Attempting to notify watchers of finalization for {}", block_hash);
|
||||
log::trace!(target: "txpool", "Attempting to notify watchers of finalization for {}", block_hash);
|
||||
self.listener.write().finalized(block_hash);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -178,7 +178,7 @@ impl<Api: ChainApi> RevalidationWorker<Api> {
|
||||
for ext_hash in transactions {
|
||||
// we don't add something that already scheduled for revalidation
|
||||
if self.members.contains_key(&ext_hash) {
|
||||
log::debug!(
|
||||
log::trace!(
|
||||
target: "txpool",
|
||||
"[{:?}] Skipped adding for revalidation: Already there.",
|
||||
ext_hash,
|
||||
@@ -245,6 +245,16 @@ impl<Api: ChainApi> RevalidationWorker<Api> {
|
||||
Some(worker_payload) => {
|
||||
this.best_block = worker_payload.at;
|
||||
this.push(worker_payload);
|
||||
|
||||
if this.members.len() > 0 {
|
||||
log::debug!(
|
||||
target: "txpool",
|
||||
"Updated revalidation queue at {}. Transactions: {:?}",
|
||||
this.best_block,
|
||||
this.members,
|
||||
);
|
||||
}
|
||||
|
||||
continue;
|
||||
},
|
||||
// R.I.P. worker!
|
||||
@@ -326,7 +336,7 @@ where
|
||||
/// revalidation is actually done.
|
||||
pub async fn revalidate_later(&self, at: NumberFor<Api>, transactions: Vec<ExHash<Api>>) {
|
||||
if transactions.len() > 0 {
|
||||
log::debug!(target: "txpool", "Added {} transactions to revalidation queue", transactions.len());
|
||||
log::debug!(target: "txpool", "Sent {} transactions to revalidation queue", transactions.len());
|
||||
}
|
||||
|
||||
if let Some(ref to_worker) = self.background {
|
||||
|
||||
Reference in New Issue
Block a user