Transactionpool: Make ready_at return earlier (#8995)

`ready_at` returns when we have processed the requested block. However,
on startup we already have processed the best block and there
are no transactions in the pool on startup anyway. So, we can set `updated_at`
to the best block on startup.

Besides that `ready_at` now returns early when there are no ready nor
any future transactions in the pool.
This commit is contained in:
Bastian Köcher
2021-06-02 20:13:47 +02:00
committed by GitHub
parent 554a0cb274
commit 2ab5f9aeca
2 changed files with 58 additions and 18 deletions
@@ -296,7 +296,13 @@ mod tests {
let client = Arc::new(client); let client = Arc::new(client);
let spawner = sp_core::testing::TaskExecutor::new(); let spawner = sp_core::testing::TaskExecutor::new();
let pool = Arc::new(BasicPool::with_revalidation_type( let pool = Arc::new(BasicPool::with_revalidation_type(
Options::default(), true.into(), api(), None, RevalidationType::Full, spawner.clone(), Options::default(),
true.into(),
api(),
None,
RevalidationType::Full,
spawner.clone(),
0,
)); ));
let env = ProposerFactory::new( let env = ProposerFactory::new(
spawner.clone(), spawner.clone(),
@@ -373,6 +379,7 @@ mod tests {
None, None,
RevalidationType::Full, RevalidationType::Full,
spawner.clone(), spawner.clone(),
0,
)); ));
let env = ProposerFactory::new( let env = ProposerFactory::new(
spawner.clone(), spawner.clone(),
@@ -453,6 +460,7 @@ mod tests {
None, None,
RevalidationType::Full, RevalidationType::Full,
spawner.clone(), spawner.clone(),
0,
)); ));
let env = ProposerFactory::new( let env = ProposerFactory::new(
spawner.clone(), spawner.clone(),
+49 -17
View File
@@ -98,6 +98,13 @@ impl<T, Block: BlockT> Default for ReadyPoll<T, Block> {
} }
impl<T, Block: BlockT> ReadyPoll<T, Block> { impl<T, Block: BlockT> ReadyPoll<T, Block> {
fn new(best_block_number: NumberFor<Block>) -> Self {
Self {
updated_at: best_block_number,
pollers: Default::default(),
}
}
fn trigger(&mut self, number: NumberFor<Block>, iterator_factory: impl Fn() -> T) { fn trigger(&mut self, number: NumberFor<Block>, iterator_factory: impl Fn() -> T) {
self.updated_at = number; self.updated_at = number;
@@ -189,6 +196,7 @@ impl<PoolApi, Block> BasicPool<PoolApi, Block>
prometheus: Option<&PrometheusRegistry>, prometheus: Option<&PrometheusRegistry>,
revalidation_type: RevalidationType, revalidation_type: RevalidationType,
spawner: impl SpawnNamed, spawner: impl SpawnNamed,
best_block_number: NumberFor<Block>,
) -> Self { ) -> Self {
let pool = Arc::new(sc_transaction_graph::Pool::new(options, is_validator, pool_api.clone())); let pool = Arc::new(sc_transaction_graph::Pool::new(options, is_validator, pool_api.clone()));
let (revalidation_queue, background_task) = match revalidation_type { let (revalidation_queue, background_task) = match revalidation_type {
@@ -213,7 +221,7 @@ impl<PoolApi, Block> BasicPool<PoolApi, Block>
RevalidationType::Full => RevalidationStrategy::Always, RevalidationType::Full => RevalidationStrategy::Always,
} }
)), )),
ready_poll: Default::default(), ready_poll: Arc::new(Mutex::new(ReadyPoll::new(best_block_number))),
metrics: PrometheusMetrics::new(prometheus), metrics: PrometheusMetrics::new(prometheus),
} }
} }
@@ -309,21 +317,29 @@ impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
} }
fn ready_at(&self, at: NumberFor<Self::Block>) -> PolledIterator<PoolApi> { fn ready_at(&self, at: NumberFor<Self::Block>) -> PolledIterator<PoolApi> {
let status = self.status();
// If there are no transactions in the pool, it is fine to return early.
//
// There could be transaction being added because of some re-org happening at the relevant
// block, but this is relative unlikely.
if status.ready == 0 && status.future == 0 {
return async { Box::new(std::iter::empty()) as Box<_> }.boxed()
}
if self.ready_poll.lock().updated_at() >= at { if self.ready_poll.lock().updated_at() >= at {
log::trace!(target: "txpool", "Transaction pool already processed block #{}", at); log::trace!(target: "txpool", "Transaction pool already processed block #{}", at);
let iterator: ReadyIteratorFor<PoolApi> = Box::new(self.pool.validated_pool().ready()); let iterator: ReadyIteratorFor<PoolApi> = Box::new(self.pool.validated_pool().ready());
return Box::pin(futures::future::ready(iterator)); return async move { iterator }.boxed();
} }
Box::pin( self.ready_poll
self.ready_poll .lock()
.lock() .add(at)
.add(at) .map(|received| received.unwrap_or_else(|e| {
.map(|received| received.unwrap_or_else(|e| { log::warn!("Error receiving pending set: {:?}", e);
log::warn!("Error receiving pending set: {:?}", e); Box::new(std::iter::empty())
Box::new(vec![].into_iter()) }))
})) .boxed()
)
} }
fn ready(&self) -> ReadyIteratorFor<PoolApi> { fn ready(&self) -> ReadyIteratorFor<PoolApi> {
@@ -334,7 +350,7 @@ impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
impl<Block, Client, Fetcher> LightPool<Block, Client, Fetcher> impl<Block, Client, Fetcher> LightPool<Block, Client, Fetcher>
where where
Block: BlockT, Block: BlockT,
Client: sp_blockchain::HeaderBackend<Block> + 'static, Client: sp_blockchain::HeaderBackend<Block> + sc_client_api::UsageProvider<Block> + 'static,
Fetcher: sc_client_api::Fetcher<Block> + 'static, Fetcher: sc_client_api::Fetcher<Block> + 'static,
{ {
/// Create new basic transaction pool for a light node with the provided api. /// Create new basic transaction pool for a light node with the provided api.
@@ -345,9 +361,15 @@ where
client: Arc<Client>, client: Arc<Client>,
fetcher: Arc<Fetcher>, fetcher: Arc<Fetcher>,
) -> Self { ) -> Self {
let pool_api = Arc::new(LightChainApi::new(client, fetcher)); let pool_api = Arc::new(LightChainApi::new(client.clone(), fetcher));
Self::with_revalidation_type( Self::with_revalidation_type(
options, false.into(), pool_api, prometheus, RevalidationType::Light, spawner, options,
false.into(),
pool_api,
prometheus,
RevalidationType::Light,
spawner,
client.usage_info().chain.best_number,
) )
} }
} }
@@ -357,8 +379,12 @@ where
Block: BlockT, Block: BlockT,
Client: sp_api::ProvideRuntimeApi<Block> Client: sp_api::ProvideRuntimeApi<Block>
+ sc_client_api::BlockBackend<Block> + sc_client_api::BlockBackend<Block>
+ sp_runtime::traits::BlockIdTo<Block>, + sp_runtime::traits::BlockIdTo<Block>
Client: sc_client_api::ExecutorProvider<Block> + Send + Sync + 'static, + sc_client_api::ExecutorProvider<Block>
+ sc_client_api::UsageProvider<Block>
+ Send
+ Sync
+ 'static,
Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>, Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
{ {
/// Create new basic transaction pool for a full node with the provided api. /// Create new basic transaction pool for a full node with the provided api.
@@ -371,7 +397,13 @@ where
) -> Arc<Self> { ) -> Arc<Self> {
let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus)); let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus));
let pool = Arc::new(Self::with_revalidation_type( let pool = Arc::new(Self::with_revalidation_type(
options, is_validator, pool_api, prometheus, RevalidationType::Full, spawner options,
is_validator,
pool_api,
prometheus,
RevalidationType::Full,
spawner,
client.usage_info().chain.best_number,
)); ));
// make transaction pool available for off-chain runtime calls. // make transaction pool available for off-chain runtime calls.