Finalized block event triggers tx maintanance (#12305)

* finalized block event triggers tx maintanance

* tx-pool: enactment helper introduced

* tx-pool: ChainApi: added tree_route method

* enactment logic implemented + tests

Signed-off-by: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com>

* Some additional tests

* minor improvements

* trigger CI job

* fix compilation errors

ChainApi::tree_route return type changed to Result<Option<..>>, as some
implementations (tests) are not required to provide this tree route.

* formatting

* trait removed

* implementation slightly simplified

(thanks to @koute)

* get rid of Arc<> in EnactmentState return value

* minor improvement

* Apply suggestions from code review

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>

* Apply suggestions from code review

* comment updated + formatting

* Apply suggestions from code review

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>
Co-authored-by: Davide Galassi <davxy@datawok.net>

* formatting

* finalization notification bug fix

+ new test case
+ log::warn message when finalized block is being retracted by new event

* added error message on tree_route failure

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <git@kchr.de>

* use provided tree_route in Finalized event

* Option removed from ChainApi::tree_route

* doc added, test and logs improved

* handle_enactment aligned with original implementation

* use async-await

* Apply suggestions from code review

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <git@kchr.de>

* formatting + warn->debug

* compilation error fix

* enactment_state initializers added

* enactment_state: Option removed

* manual-seal: compilation & tests fix

* manual-seal: tests fixed

* tests cleanup

* another compilation error fixed

* TreeRoute::new added

* get rid of pub hack

* one more test added

* formatting

* TreeRoute::new doc added + formatting

* Apply suggestions from code review

Co-authored-by: Davide Galassi <davxy@datawok.net>

* (bool,Option) simplified to Option

* log message improved

* yet another review suggestions applied

* get rid of hash in handle_enactment

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <git@kchr.de>

* Update client/transaction-pool/src/lib.rs

Co-authored-by: Bastian Köcher <git@kchr.de>

* minor corrections

* EnactmentState moved to new file

* File header corrected

* error formatting aligned with codebase

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <git@kchr.de>

* remove commented code

* small nits

Signed-off-by: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com>
Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>
Co-authored-by: Davide Galassi <davxy@datawok.net>
Co-authored-by: Bastian Köcher <git@kchr.de>
Co-authored-by: André Silva <andrerfosilva@gmail.com>
This commit is contained in:
Michal Kucharczyk
2022-10-11 22:20:13 +02:00
committed by GitHub
parent 023aa03fea
commit 62bca87f3a
14 changed files with 1470 additions and 195 deletions
+201 -153
View File
@@ -23,6 +23,7 @@
#![warn(unused_extern_crates)]
mod api;
mod enactment_state;
pub mod error;
mod graph;
mod metrics;
@@ -31,6 +32,8 @@ mod revalidation;
mod tests;
pub use crate::api::FullChainApi;
use async_trait::async_trait;
use enactment_state::EnactmentState;
use futures::{
channel::oneshot,
future::{self, ready},
@@ -62,6 +65,8 @@ use std::time::Instant;
use crate::metrics::MetricsLink as PrometheusMetrics;
use prometheus_endpoint::Registry as PrometheusRegistry;
use sp_blockchain::{HashAndNumber, TreeRoute};
type BoxedReadyIterator<Hash, Data> =
Box<dyn ReadyTransactions<Item = Arc<graph::base_pool::Transaction<Hash, Data>>> + Send>;
@@ -85,6 +90,7 @@ where
revalidation_queue: Arc<revalidation::RevalidationQueue<PoolApi>>,
ready_poll: Arc<Mutex<ReadyPoll<ReadyIteratorFor<PoolApi>, Block>>>,
metrics: PrometheusMetrics,
enactment_state: Arc<Mutex<EnactmentState<Block>>>,
}
struct ReadyPoll<T, Block: BlockT> {
@@ -163,7 +169,11 @@ where
PoolApi: graph::ChainApi<Block = Block> + 'static,
{
/// Create new basic transaction pool with provided api, for tests.
pub fn new_test(pool_api: Arc<PoolApi>) -> (Self, Pin<Box<dyn Future<Output = ()> + Send>>) {
pub fn new_test(
pool_api: Arc<PoolApi>,
best_block_hash: Block::Hash,
finalized_hash: Block::Hash,
) -> (Self, Pin<Box<dyn Future<Output = ()> + Send>>) {
let pool = Arc::new(graph::Pool::new(Default::default(), true.into(), pool_api.clone()));
let (revalidation_queue, background_task) =
revalidation::RevalidationQueue::new_background(pool_api.clone(), pool.clone());
@@ -175,6 +185,10 @@ where
revalidation_strategy: Arc::new(Mutex::new(RevalidationStrategy::Always)),
ready_poll: Default::default(),
metrics: Default::default(),
enactment_state: Arc::new(Mutex::new(EnactmentState::new(
best_block_hash,
finalized_hash,
))),
},
background_task,
)
@@ -190,6 +204,8 @@ where
revalidation_type: RevalidationType,
spawner: impl SpawnEssentialNamed,
best_block_number: NumberFor<Block>,
best_block_hash: Block::Hash,
finalized_hash: Block::Hash,
) -> Self {
let pool = Arc::new(graph::Pool::new(options, is_validator, pool_api.clone()));
let (revalidation_queue, background_task) = match revalidation_type {
@@ -217,6 +233,10 @@ where
})),
ready_poll: Arc::new(Mutex::new(ReadyPoll::new(best_block_number))),
metrics: PrometheusMetrics::new(prometheus),
enactment_state: Arc::new(Mutex::new(EnactmentState::new(
best_block_hash,
finalized_hash,
))),
}
}
@@ -358,6 +378,7 @@ where
+ sp_runtime::traits::BlockIdTo<Block>
+ sc_client_api::ExecutorProvider<Block>
+ sc_client_api::UsageProvider<Block>
+ sp_blockchain::HeaderMetadata<Block, Error = sp_blockchain::Error>
+ Send
+ Sync
+ 'static,
@@ -380,6 +401,8 @@ where
RevalidationType::Full,
spawner,
client.usage_info().chain.best_number,
client.usage_info().chain.best_hash,
client.usage_info().chain.finalized_hash,
));
// make transaction pool available for off-chain runtime calls.
@@ -396,7 +419,8 @@ where
Client: sp_api::ProvideRuntimeApi<Block>
+ sc_client_api::BlockBackend<Block>
+ sc_client_api::blockchain::HeaderBackend<Block>
+ sp_runtime::traits::BlockIdTo<Block>,
+ sp_runtime::traits::BlockIdTo<Block>
+ sp_blockchain::HeaderMetadata<Block, Error = sp_blockchain::Error>,
Client: Send + Sync + 'static,
Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
{
@@ -563,166 +587,190 @@ async fn prune_known_txs_for_block<Block: BlockT, Api: graph::ChainApi<Block = B
hashes
}
impl<PoolApi, Block> BasicPool<PoolApi, Block>
where
Block: BlockT,
PoolApi: 'static + graph::ChainApi<Block = Block>,
{
/// Handles enactment and retraction of blocks, prunes stale transactions
/// (that have already been enacted) and resubmits transactions that were
/// retracted.
async fn handle_enactment(&self, tree_route: TreeRoute<Block>) {
log::trace!(target: "txpool", "handle_enactment tree_route: {tree_route:?}");
let pool = self.pool.clone();
let api = self.api.clone();
let (hash, block_number) = match tree_route.last() {
Some(HashAndNumber { hash, number }) => (hash, number),
None => {
log::warn!(
target: "txpool",
"Skipping ChainEvent - no last block in tree route {:?}",
tree_route,
);
return
},
};
let next_action = self.revalidation_strategy.lock().next(
*block_number,
Some(std::time::Duration::from_secs(60)),
Some(20u32.into()),
);
// We keep track of everything we prune so that later we won't add
// transactions with those hashes from the retracted blocks.
let mut pruned_log = HashSet::<ExtrinsicHash<PoolApi>>::new();
// If there is a tree route, we use this to prune known tx based on the enacted
// blocks. Before pruning enacted transactions, we inform the listeners about
// retracted blocks and their transactions. This order is important, because
// if we enact and retract the same transaction at the same time, we want to
// send first the retract and than the prune event.
for retracted in tree_route.retracted() {
// notify txs awaiting finality that it has been retracted
pool.validated_pool().on_block_retracted(retracted.hash);
}
future::join_all(
tree_route
.enacted()
.iter()
.map(|h| prune_known_txs_for_block(BlockId::Hash(h.hash), &*api, &*pool)),
)
.await
.into_iter()
.for_each(|enacted_log| {
pruned_log.extend(enacted_log);
});
self.metrics
.report(|metrics| metrics.block_transactions_pruned.inc_by(pruned_log.len() as u64));
if next_action.resubmit {
let mut resubmit_transactions = Vec::new();
for retracted in tree_route.retracted() {
let hash = retracted.hash;
let block_transactions = api
.block_body(&BlockId::hash(hash))
.await
.unwrap_or_else(|e| {
log::warn!("Failed to fetch block body: {}", e);
None
})
.unwrap_or_default()
.into_iter()
.filter(|tx| tx.is_signed().unwrap_or(true));
let mut resubmitted_to_report = 0;
resubmit_transactions.extend(block_transactions.into_iter().filter(|tx| {
let tx_hash = pool.hash_of(tx);
let contains = pruned_log.contains(&tx_hash);
// need to count all transactions, not just filtered, here
resubmitted_to_report += 1;
if !contains {
log::debug!(
target: "txpool",
"[{:?}]: Resubmitting from retracted block {:?}",
tx_hash,
hash,
);
}
!contains
}));
self.metrics.report(|metrics| {
metrics.block_transactions_resubmitted.inc_by(resubmitted_to_report)
});
}
if let Err(e) = pool
.resubmit_at(
&BlockId::Hash(*hash),
// These transactions are coming from retracted blocks, we should
// simply consider them external.
TransactionSource::External,
resubmit_transactions,
)
.await
{
log::debug!(
target: "txpool",
"[{:?}] Error re-submitting transactions: {}",
hash,
e,
)
}
}
let extra_pool = pool.clone();
// After #5200 lands, this arguably might be moved to the
// handler of "all blocks notification".
self.ready_poll
.lock()
.trigger(*block_number, move || Box::new(extra_pool.validated_pool().ready()));
if next_action.revalidate {
let hashes = pool.validated_pool().ready().map(|tx| tx.hash).collect();
self.revalidation_queue.revalidate_later(*block_number, hashes).await;
self.revalidation_strategy.lock().clear();
}
}
}
#[async_trait]
impl<PoolApi, Block> MaintainedTransactionPool for BasicPool<PoolApi, Block>
where
Block: BlockT,
PoolApi: 'static + graph::ChainApi<Block = Block>,
{
fn maintain(&self, event: ChainEvent<Self::Block>) -> Pin<Box<dyn Future<Output = ()> + Send>> {
match event {
ChainEvent::NewBestBlock { hash, tree_route } => {
let pool = self.pool.clone();
let api = self.api.clone();
async fn maintain(&self, event: ChainEvent<Self::Block>) {
let prev_finalized_block = self.enactment_state.lock().recent_finalized_block();
let compute_tree_route = |from, to| -> Result<TreeRoute<Block>, String> {
match self.api.tree_route(from, to) {
Ok(tree_route) => Ok(tree_route),
Err(e) =>
return Err(format!(
"Error occurred while computing tree_route from {from:?} to {to:?}: {e}"
)),
}
};
let id = BlockId::hash(hash);
let block_number = match api.block_id_to_number(&id) {
Ok(Some(number)) => number,
_ => {
log::trace!(
target: "txpool",
"Skipping chain event - no number for that block {:?}",
id,
);
return Box::pin(ready(()))
},
};
let result = self.enactment_state.lock().update(&event, &compute_tree_route);
let next_action = self.revalidation_strategy.lock().next(
block_number,
Some(std::time::Duration::from_secs(60)),
Some(20u32.into()),
);
let revalidation_strategy = self.revalidation_strategy.clone();
let revalidation_queue = self.revalidation_queue.clone();
let ready_poll = self.ready_poll.clone();
let metrics = self.metrics.clone();
async move {
// We keep track of everything we prune so that later we won't add
// transactions with those hashes from the retracted blocks.
let mut pruned_log = HashSet::<ExtrinsicHash<PoolApi>>::new();
// If there is a tree route, we use this to prune known tx based on the enacted
// blocks. Before pruning enacted transactions, we inform the listeners about
// retracted blocks and their transactions. This order is important, because
// if we enact and retract the same transaction at the same time, we want to
// send first the retract and than the prune event.
if let Some(ref tree_route) = tree_route {
for retracted in tree_route.retracted() {
// notify txs awaiting finality that it has been retracted
pool.validated_pool().on_block_retracted(retracted.hash);
}
future::join_all(tree_route.enacted().iter().map(|h| {
prune_known_txs_for_block(BlockId::Hash(h.hash), &*api, &*pool)
}))
.await
.into_iter()
.for_each(|enacted_log| {
pruned_log.extend(enacted_log);
})
}
pruned_log.extend(prune_known_txs_for_block(id, &*api, &*pool).await);
metrics.report(|metrics| {
metrics.block_transactions_pruned.inc_by(pruned_log.len() as u64)
});
if let (true, Some(tree_route)) = (next_action.resubmit, tree_route) {
let mut resubmit_transactions = Vec::new();
for retracted in tree_route.retracted() {
let hash = retracted.hash;
let block_transactions = api
.block_body(&BlockId::hash(hash))
.await
.unwrap_or_else(|e| {
log::warn!("Failed to fetch block body: {}", e);
None
})
.unwrap_or_default()
.into_iter()
.filter(|tx| tx.is_signed().unwrap_or(true));
let mut resubmitted_to_report = 0;
resubmit_transactions.extend(block_transactions.into_iter().filter(
|tx| {
let tx_hash = pool.hash_of(tx);
let contains = pruned_log.contains(&tx_hash);
// need to count all transactions, not just filtered, here
resubmitted_to_report += 1;
if !contains {
log::debug!(
target: "txpool",
"[{:?}]: Resubmitting from retracted block {:?}",
tx_hash,
hash,
);
}
!contains
},
));
metrics.report(|metrics| {
metrics.block_transactions_resubmitted.inc_by(resubmitted_to_report)
});
}
if let Err(e) = pool
.resubmit_at(
&id,
// These transactions are coming from retracted blocks, we should
// simply consider them external.
TransactionSource::External,
resubmit_transactions,
)
.await
{
log::debug!(
target: "txpool",
"[{:?}] Error re-submitting transactions: {}",
id,
e,
)
}
}
let extra_pool = pool.clone();
// After #5200 lands, this arguably might be moved to the
// handler of "all blocks notification".
ready_poll.lock().trigger(block_number, move || {
Box::new(extra_pool.validated_pool().ready())
});
if next_action.revalidate {
let hashes = pool.validated_pool().ready().map(|tx| tx.hash).collect();
revalidation_queue.revalidate_later(block_number, hashes).await;
revalidation_strategy.lock().clear();
}
}
.boxed()
match result {
Err(msg) => {
log::warn!(target: "txpool", "{msg}");
return
},
ChainEvent::Finalized { hash, tree_route } => {
let pool = self.pool.clone();
async move {
for hash in tree_route.iter().chain(&[hash]) {
if let Err(e) = pool.validated_pool().on_block_finalized(*hash).await {
log::warn!(
target: "txpool",
"Error [{}] occurred while attempting to notify watchers of finalization {}",
e, hash
)
}
}
}
.boxed()
Ok(None) => {},
Ok(Some(tree_route)) => {
self.handle_enactment(tree_route).await;
},
};
if let ChainEvent::Finalized { hash, tree_route } = event {
log::trace!(
target: "txpool",
"on-finalized enacted: {tree_route:?}, previously finalized: \
{prev_finalized_block:?}",
);
for hash in tree_route.iter().chain(std::iter::once(&hash)) {
if let Err(e) = self.pool.validated_pool().on_block_finalized(*hash).await {
log::warn!(
target: "txpool",
"Error occurred while attempting to notify watchers about finalization {}: {}",
hash, e
)
}
}
}
}
}