mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-11 21:11:07 +00:00
Revalidation tweak & logging for transaction pool (#6258)
* updates and logging * fix length * Update client/transaction-pool/src/lib.rs * rename * Update client/transaction-pool/src/lib.rs Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com> Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
This commit is contained in:
@@ -234,7 +234,8 @@ impl<A, B, Block, C> Proposer<B, Block, C, A>
|
||||
Either::Left((iterator, _)) => iterator,
|
||||
Either::Right(_) => {
|
||||
log::warn!(
|
||||
"Timeout fired waiting for transaction pool to be ready. Proceeding to block production anyway.",
|
||||
"Timeout fired waiting for transaction pool at block #{}. Proceeding with production.",
|
||||
self.parent_number,
|
||||
);
|
||||
self.transaction_pool.ready()
|
||||
}
|
||||
|
||||
@@ -331,6 +331,7 @@ impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
|
||||
|
||||
fn ready_at(&self, at: NumberFor<Self::Block>) -> PolledIterator<PoolApi> {
|
||||
if self.ready_poll.lock().updated_at() >= at {
|
||||
log::trace!(target: "txpool", "Transaction pool already processed block #{}", at);
|
||||
let iterator: ReadyIteratorFor<PoolApi> = Box::new(self.pool.validated_pool().ready());
|
||||
return Box::pin(futures::future::ready(iterator));
|
||||
}
|
||||
@@ -456,6 +457,8 @@ async fn prune_known_txs_for_block<Block: BlockT, Api: ChainApi<Block = Block>>(
|
||||
.map(|tx| pool.hash_of(&tx))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
log::trace!(target: "txpool", "Pruning transactions: {:?}", hashes);
|
||||
|
||||
if let Err(e) = pool.prune_known(&block_id, &hashes) {
|
||||
log::error!("Cannot prune known in the pool {:?}!", e);
|
||||
}
|
||||
|
||||
@@ -34,7 +34,7 @@ const BACKGROUND_REVALIDATION_INTERVAL: Duration = Duration::from_millis(200);
|
||||
#[cfg(test)]
|
||||
pub const BACKGROUND_REVALIDATION_INTERVAL: Duration = Duration::from_millis(1);
|
||||
|
||||
const BACKGROUND_REVALIDATION_BATCH_SIZE: usize = 20;
|
||||
const MIN_BACKGROUND_REVALIDATION_BATCH_SIZE: usize = 20;
|
||||
|
||||
/// Payload from queue to worker.
|
||||
struct WorkerPayload<Api: ChainApi> {
|
||||
@@ -68,13 +68,20 @@ async fn batch_revalidate<Api: ChainApi>(
|
||||
let mut invalid_hashes = Vec::new();
|
||||
let mut revalidated = HashMap::new();
|
||||
|
||||
for ext_hash in batch {
|
||||
let ext = match pool.validated_pool().ready_by_hash(&ext_hash) {
|
||||
Some(ext) => ext,
|
||||
None => continue,
|
||||
};
|
||||
let validation_results = futures::future::join_all(
|
||||
batch.into_iter().filter_map(|ext_hash| {
|
||||
pool.validated_pool().ready_by_hash(&ext_hash).map(|ext| {
|
||||
let api = api.clone();
|
||||
async move {
|
||||
api.validate_transaction(&BlockId::Number(at), ext.source, ext.data.clone())
|
||||
.map(|validation_result| (validation_result, ext_hash.clone(), ext)).await
|
||||
}
|
||||
})
|
||||
})
|
||||
).await;
|
||||
|
||||
match api.validate_transaction(&BlockId::Number(at), ext.source, ext.data.clone()).await {
|
||||
for (validation_result, ext_hash, ext) in validation_results {
|
||||
match validation_result {
|
||||
Ok(Err(TransactionValidityError::Invalid(err))) => {
|
||||
log::debug!(target: "txpool", "[{:?}]: Revalidation: invalid {:?}", ext_hash, err);
|
||||
invalid_hashes.push(ext_hash);
|
||||
@@ -131,7 +138,7 @@ impl<Api: ChainApi> RevalidationWorker<Api> {
|
||||
|
||||
fn prepare_batch(&mut self) -> Vec<ExtrinsicHash<Api>> {
|
||||
let mut queued_exts = Vec::new();
|
||||
let mut left = BACKGROUND_REVALIDATION_BATCH_SIZE;
|
||||
let mut left = std::cmp::max(MIN_BACKGROUND_REVALIDATION_BATCH_SIZE, self.members.len() / 4);
|
||||
|
||||
// Take maximum of count transaction by order
|
||||
// which they got into the pool
|
||||
|
||||
Reference in New Issue
Block a user