Refactor tx-pool maintenance and other high-level api (#4629)

* Reduction.

* Reformation.

* add locked timer stuff

* fix issues and introduce full pool

* arrange together

* fix benches

* fix new_light

* Add revalidation test case

* review fixes

* review fixes

* use just ready future

* address review
This commit is contained in:
Nikolay Volf
2020-01-24 04:21:24 -08:00
committed by GitHub
parent b89ac5d2ef
commit 14e95f3398
12 changed files with 423 additions and 827 deletions
+6 -7
View File
@@ -43,9 +43,7 @@ macro_rules! new_full_start {
.with_transaction_pool(|config, client, _fetcher| {
let pool_api = sc_transaction_pool::FullChainApi::new(client.clone());
let pool = sc_transaction_pool::BasicPool::new(config, pool_api);
let maintainer = sc_transaction_pool::FullBasicPoolMaintainer::new(pool.pool().clone(), client);
let maintainable_pool = sp_transaction_pool::MaintainableTransactionPool::new(pool, maintainer);
Ok(maintainable_pool)
Ok(pool)
})?
.with_import_queue(|_config, client, mut select_chain, transaction_pool| {
let select_chain = select_chain.take()
@@ -207,11 +205,12 @@ pub fn new_light<C: Send + Default + 'static>(config: Configuration<C, GenesisCo
.with_transaction_pool(|config, client, fetcher| {
let fetcher = fetcher
.ok_or_else(|| "Trying to start light transaction pool without active fetcher")?;
let pool_api = sc_transaction_pool::LightChainApi::new(client.clone(), fetcher.clone());
let pool = sc_transaction_pool::BasicPool::new(config, pool_api);
let maintainer = sc_transaction_pool::LightBasicPoolMaintainer::with_defaults(pool.pool().clone(), client, fetcher);
let maintainable_pool = sp_transaction_pool::MaintainableTransactionPool::new(pool, maintainer);
Ok(maintainable_pool)
let pool = sc_transaction_pool::BasicPool::with_revalidation_type(
config, pool_api, sc_transaction_pool::RevalidationType::Light,
);
Ok(pool)
})?
.with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _tx_pool| {
let fetch_checker = fetcher
+8 -16
View File
@@ -64,9 +64,7 @@ macro_rules! new_full_start {
.with_transaction_pool(|config, client, _fetcher| {
let pool_api = sc_transaction_pool::FullChainApi::new(client.clone());
let pool = sc_transaction_pool::BasicPool::new(config, pool_api);
let maintainer = sc_transaction_pool::FullBasicPoolMaintainer::new(pool.pool().clone(), client);
let maintainable_pool = sp_transaction_pool::MaintainableTransactionPool::new(pool, maintainer);
Ok(maintainable_pool)
Ok(pool)
})?
.with_import_queue(|_config, client, mut select_chain, _transaction_pool| {
let select_chain = select_chain.take()
@@ -272,15 +270,9 @@ type ConcreteClient =
#[allow(dead_code)]
type ConcreteBackend = Backend<ConcreteBlock>;
#[allow(dead_code)]
type ConcreteTransactionPool = sp_transaction_pool::MaintainableTransactionPool<
sc_transaction_pool::BasicPool<
sc_transaction_pool::FullChainApi<ConcreteClient, ConcreteBlock>,
ConcreteBlock
>,
sc_transaction_pool::FullBasicPoolMaintainer<
ConcreteClient,
sc_transaction_pool::FullChainApi<ConcreteClient, Block>
>
type ConcreteTransactionPool = sc_transaction_pool::BasicPool<
sc_transaction_pool::FullChainApi<ConcreteClient, ConcreteBlock>,
ConcreteBlock
>;
/// A specialized configuration object for setting up the node..
@@ -322,10 +314,10 @@ pub fn new_light<C: Send + Default + 'static>(config: NodeConfiguration<C>)
let fetcher = fetcher
.ok_or_else(|| "Trying to start light transaction pool without active fetcher")?;
let pool_api = sc_transaction_pool::LightChainApi::new(client.clone(), fetcher.clone());
let pool = sc_transaction_pool::BasicPool::new(config, pool_api);
let maintainer = sc_transaction_pool::LightBasicPoolMaintainer::with_defaults(pool.pool().clone(), client, fetcher);
let maintainable_pool = sp_transaction_pool::MaintainableTransactionPool::new(pool, maintainer);
Ok(maintainable_pool)
let pool = sc_transaction_pool::BasicPool::with_revalidation_type(
config, pool_api, sc_transaction_pool::RevalidationType::Light,
);
Ok(pool)
})?
.with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _tx_pool| {
let fetch_checker = fetcher
+2 -4
View File
@@ -49,7 +49,7 @@ use std::{
};
use sysinfo::{get_current_pid, ProcessExt, System, SystemExt};
use sc_telemetry::{telemetry, SUBSTRATE_INFO};
use sp_transaction_pool::{TransactionPool, TransactionPoolMaintainer};
use sp_transaction_pool::MaintainedTransactionPool;
use sp_blockchain;
use grafana_data_source::{self, record_metrics};
@@ -740,9 +740,7 @@ ServiceBuilder<
TSc: Clone,
TImpQu: 'static + ImportQueue<TBl>,
TNetP: NetworkSpecialization<TBl>,
TExPool: 'static
+ TransactionPool<Block=TBl, Hash = <TBl as BlockT>::Hash>
+ TransactionPoolMaintainer<Block=TBl, Hash = <TBl as BlockT>::Hash>,
TExPool: MaintainedTransactionPool<Block=TBl, Hash = <TBl as BlockT>::Hash> + 'static,
TRpc: sc_rpc::RpcExtension<sc_rpc::Metadata> + Clone,
{
+3 -5
View File
@@ -61,7 +61,7 @@ pub use self::builder::{
};
pub use config::{Configuration, Roles, PruningMode};
pub use sc_chain_spec::{ChainSpec, Properties, RuntimeGenesis, Extension as ChainSpecExtension};
pub use sp_transaction_pool::{TransactionPool, TransactionPoolMaintainer, InPoolTransaction, error::IntoPoolError};
pub use sp_transaction_pool::{TransactionPool, InPoolTransaction, error::IntoPoolError};
pub use sc_transaction_pool::txpool::Options as TransactionPoolOptions;
pub use sc_client::FinalityNotifications;
pub use sc_rpc::Metadata as RpcMetadata;
@@ -148,8 +148,7 @@ pub trait AbstractService: 'static + Future<Output = Result<(), Error>> +
/// Chain selection algorithm.
type SelectChain: sp_consensus::SelectChain<Self::Block>;
/// Transaction pool.
type TransactionPool: TransactionPool<Block = Self::Block>
+ TransactionPoolMaintainer<Block = Self::Block>;
type TransactionPool: TransactionPool<Block = Self::Block>;
/// Network specialization.
type NetworkSpecialization: NetworkSpecialization<Self::Block>;
@@ -213,8 +212,7 @@ where
TExec: 'static + sc_client::CallExecutor<TBl> + Send + Sync + Clone,
TRtApi: 'static + Send + Sync,
TSc: sp_consensus::SelectChain<TBl> + 'static + Clone + Send + Unpin,
TExPool: 'static + TransactionPool<Block = TBl>
+ TransactionPoolMaintainer<Block = TBl>,
TExPool: 'static + TransactionPool<Block = TBl>,
TOc: 'static + Send + Sync,
TNetSpec: NetworkSpecialization<TBl>,
{
@@ -16,7 +16,7 @@
use criterion::{criterion_group, criterion_main, Criterion};
use futures::executor::block_on;
use futures::{future::{ready, Ready}, executor::block_on};
use sc_transaction_graph::*;
use sp_runtime::transaction_validity::{ValidTransaction, InvalidTransaction};
use codec::Encode;
@@ -49,7 +49,8 @@ impl ChainApi for TestApi {
type Block = Block;
type Hash = H256;
type Error = sp_transaction_pool::error::Error;
type ValidationFuture = futures::future::Ready<sp_transaction_pool::error::Result<TransactionValidity>>;
type ValidationFuture = Ready<sp_transaction_pool::error::Result<TransactionValidity>>;
type BodyFuture = Ready<sp_transaction_pool::error::Result<Option<Vec<Extrinsic>>>>;
fn validate_transaction(
&self,
@@ -61,14 +62,14 @@ impl ChainApi for TestApi {
match self.block_id_to_number(at) {
Ok(Some(num)) if num > 5 => {
return futures::future::ready(
return ready(
Ok(Err(InvalidTransaction::Stale.into()))
)
},
_ => {},
}
futures::future::ready(
ready(
Ok(Ok(ValidTransaction {
priority: 4,
requires: if nonce > 1 && self.nonce_dependant {
@@ -105,6 +106,10 @@ impl ChainApi for TestApi {
let encoded = uxt.encode();
(blake2_256(&encoded).into(), encoded.len())
}
fn block_body(&self, _id: &BlockId<Self::Block>) -> Self::BodyFuture {
ready(Ok(None))
}
}
fn uxt(transfer: Transfer) -> Extrinsic {
@@ -150,13 +155,13 @@ fn benchmark_main(c: &mut Criterion) {
c.bench_function("sequential 50 tx", |b| {
b.iter(|| {
bench_configured(Pool::new(Default::default(), TestApi::new_dependant()), 50);
bench_configured(Pool::new(Default::default(), TestApi::new_dependant().into()), 50);
});
});
c.bench_function("random 100 tx", |b| {
b.iter(|| {
bench_configured(Pool::new(Default::default(), TestApi::default()), 100);
bench_configured(Pool::new(Default::default(), TestApi::default().into()), 100);
});
});
}
@@ -68,6 +68,8 @@ pub trait ChainApi: Send + Sync {
type Error: From<error::Error> + error::IntoPoolError;
/// Validate transaction future.
type ValidationFuture: Future<Output=Result<TransactionValidity, Self::Error>> + Send + Unpin;
/// Body future (since block body might be remote)
type BodyFuture: Future<Output = Result<Option<Vec<<Self::Block as traits::Block>::Extrinsic>>, Self::Error>> + Unpin + Send + 'static;
/// Verify extrinsic at given block.
fn validate_transaction(
@@ -84,6 +86,9 @@ pub trait ChainApi: Send + Sync {
/// Returns hash and encoding length of the extrinsic.
fn hash_and_length(&self, uxt: &ExtrinsicFor<Self>) -> (Self::Hash, usize);
/// Returns a block body given the block id.
fn block_body(&self, at: &BlockId<Self::Block>) -> Self::BodyFuture;
}
/// Pool configuration options.
@@ -120,7 +125,7 @@ pub struct Pool<B: ChainApi> {
impl<B: ChainApi> Pool<B> {
/// Create a new transaction pool.
pub fn new(options: Options, api: B) -> Self {
pub fn new(options: Options, api: Arc<B>) -> Self {
Pool {
validated_pool: Arc::new(ValidatedPool::new(options, api)),
}
@@ -488,6 +493,7 @@ mod tests {
type Hash = u64;
type Error = error::Error;
type ValidationFuture = futures::future::Ready<error::Result<TransactionValidity>>;
type BodyFuture = futures::future::Ready<error::Result<Option<Vec<Extrinsic>>>>;
/// Verify extrinsic at given block.
fn validate_transaction(
@@ -560,6 +566,10 @@ mod tests {
len
)
}
fn block_body(&self, _id: &BlockId<Self::Block>) -> Self::BodyFuture {
futures::future::ready(Ok(None))
}
}
fn uxt(transfer: Transfer) -> Extrinsic {
@@ -567,7 +577,7 @@ mod tests {
}
fn pool() -> Pool<TestApi> {
Pool::new(Default::default(), TestApi::default())
Pool::new(Default::default(), TestApi::default().into())
}
#[test]
@@ -713,7 +723,7 @@ mod tests {
ready: limit.clone(),
future: limit.clone(),
..Default::default()
}, TestApi::default());
}, TestApi::default().into());
let hash1 = block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(1)),
@@ -748,7 +758,7 @@ mod tests {
ready: limit.clone(),
future: limit.clone(),
..Default::default()
}, TestApi::default());
}, TestApi::default().into());
// when
block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer {
@@ -924,7 +934,7 @@ mod tests {
ready: limit.clone(),
future: limit.clone(),
..Default::default()
}, TestApi::default());
}, TestApi::default().into());
let xt = uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(1)),
@@ -958,7 +968,7 @@ mod tests {
let (tx, rx) = std::sync::mpsc::sync_channel(1);
let mut api = TestApi::default();
api.delay = Arc::new(Mutex::new(rx.into()));
let pool = Arc::new(Pool::new(Default::default(), api));
let pool = Arc::new(Pool::new(Default::default(), api.into()));
// when
let xt = uxt(Transfer {
@@ -63,7 +63,7 @@ pub type ValidatedTransactionFor<B> = ValidatedTransaction<
/// Pool that deals with validated transactions.
pub(crate) struct ValidatedPool<B: ChainApi> {
api: B,
api: Arc<B>,
options: Options,
listener: RwLock<Listener<ExHash<B>, BlockHash<B>>>,
pool: RwLock<base::BasePool<
@@ -76,7 +76,7 @@ pub(crate) struct ValidatedPool<B: ChainApi> {
impl<B: ChainApi> ValidatedPool<B> {
/// Create a new transaction pool.
pub fn new(options: Options, api: B) -> Self {
pub fn new(options: Options, api: Arc<B>) -> Self {
let base_pool = base::BasePool::new(options.reject_future_transactions);
ValidatedPool {
api,
+39 -3
View File
@@ -19,12 +19,13 @@
use std::{marker::PhantomData, pin::Pin, sync::Arc};
use codec::{Decode, Encode};
use futures::{
channel::oneshot, executor::{ThreadPool, ThreadPoolBuilder}, future::{Future, FutureExt, ready},
channel::oneshot, executor::{ThreadPool, ThreadPoolBuilder}, future::{Future, FutureExt, ready, Ready},
};
use sc_client_api::{
blockchain::HeaderBackend,
light::{Fetcher, RemoteCallRequest}
light::{Fetcher, RemoteCallRequest, RemoteBodyRequest},
BlockBody,
};
use sp_core::Hasher;
use sp_runtime::{
@@ -63,7 +64,7 @@ impl<Client, Block> FullChainApi<Client, Block> where
impl<Client, Block> sc_transaction_graph::ChainApi for FullChainApi<Client, Block> where
Block: BlockT,
Client: ProvideRuntimeApi<Block> + BlockIdTo<Block> + 'static + Send + Sync,
Client: ProvideRuntimeApi<Block> + BlockBody<Block> + BlockIdTo<Block> + 'static + Send + Sync,
Client::Api: TaggedTransactionQueue<Block>,
sp_api::ApiErrorFor<Client, Block>: Send,
{
@@ -71,6 +72,11 @@ impl<Client, Block> sc_transaction_graph::ChainApi for FullChainApi<Client, Bloc
type Hash = Block::Hash;
type Error = error::Error;
type ValidationFuture = Pin<Box<dyn Future<Output = error::Result<TransactionValidity>> + Send>>;
type BodyFuture = Ready<error::Result<Option<Vec<<Self::Block as BlockT>::Extrinsic>>>>;
fn block_body(&self, id: &BlockId<Self::Block>) -> Self::BodyFuture {
ready(self.client.block_body(&id).map_err(|e| error::Error::from(e)))
}
fn validate_transaction(
&self,
@@ -149,6 +155,7 @@ impl<Client, F, Block> sc_transaction_graph::ChainApi for LightChainApi<Client,
type Hash = Block::Hash;
type Error = error::Error;
type ValidationFuture = Box<dyn Future<Output = error::Result<TransactionValidity>> + Send + Unpin>;
type BodyFuture = Pin<Box<dyn Future<Output = error::Result<Option<Vec<<Self::Block as BlockT>::Extrinsic>>>> + Send>>;
fn validate_transaction(
&self,
@@ -197,4 +204,33 @@ impl<Client, F, Block> sc_transaction_graph::ChainApi for LightChainApi<Client,
(<<Block::Header as HeaderT>::Hashing as HashT>::hash(x), x.len())
})
}
fn block_body(&self, id: &BlockId<Self::Block>) -> Self::BodyFuture {
let header = self.client.header(*id)
.and_then(|h| h.ok_or(sp_blockchain::Error::UnknownBlock(format!("{}", id))));
let header = match header {
Ok(header) => header,
Err(err) => {
log::warn!(target: "txpool", "Failed to query header: {:?}", err);
return Box::pin(ready(Ok(None)));
}
};
let fetcher = self.fetcher.clone();
async move {
let transactions = fetcher.remote_body({
RemoteBodyRequest {
header,
retry_count: None,
}
})
.await
.unwrap_or_else(|e| {
log::warn!(target: "txpool", "Failed to fetch block body: {:?}", e);
Vec::new()
});
Ok(Some(transactions))
}.boxed()
}
}
+222 -9
View File
@@ -20,26 +20,26 @@
#![warn(unused_extern_crates)]
mod api;
mod maintainer;
pub mod error;
#[cfg(test)]
mod tests;
pub use sc_transaction_graph as txpool;
pub use crate::api::{FullChainApi, LightChainApi};
pub use crate::maintainer::{FullBasicPoolMaintainer, LightBasicPoolMaintainer};
use std::{collections::HashMap, sync::Arc};
use futures::{Future, FutureExt};
use std::{collections::HashMap, sync::Arc, pin::Pin, time::Instant};
use futures::{Future, FutureExt, future::ready};
use parking_lot::Mutex;
use sp_runtime::{
generic::BlockId,
traits::Block as BlockT,
traits::{Block as BlockT, NumberFor, SimpleArithmetic, Extrinsic},
};
use sp_transaction_pool::{
TransactionPool, PoolStatus, ImportNotificationStream,
TxHash, TransactionFor, TransactionStatusStreamFor,
TxHash, TransactionFor, TransactionStatusStreamFor, BlockHash,
MaintainedTransactionPool,
};
/// Basic implementation of transaction pool that can be customized by providing PoolApi.
@@ -49,6 +49,25 @@ pub struct BasicPool<PoolApi, Block>
PoolApi: sc_transaction_graph::ChainApi<Block=Block, Hash=Block::Hash>,
{
pool: Arc<sc_transaction_graph::Pool<PoolApi>>,
api: Arc<PoolApi>,
revalidation_strategy: Arc<Mutex<RevalidationStrategy<NumberFor<Block>>>>,
}
/// Type of revalidation.
pub enum RevalidationType {
/// Light revalidation type.
///
/// During maintenance, transaction pool makes periodic revalidation
/// of all transactions depending on number of blocks or time passed.
/// Also this kind of revalidation does not resubmit transactions from
/// retracted blocks, since it is too expensive.
Light,
/// Full revalidation type.
///
/// During maintenance, transaction pool revalidates some fixed amount of
/// transactions from the pool of valid transactions.
Full,
}
impl<PoolApi, Block> BasicPool<PoolApi, Block>
@@ -57,16 +76,44 @@ impl<PoolApi, Block> BasicPool<PoolApi, Block>
PoolApi: sc_transaction_graph::ChainApi<Block=Block, Hash=Block::Hash>,
{
/// Create new basic transaction pool with provided api.
pub fn new(options: sc_transaction_graph::Options, pool_api: PoolApi) -> Self {
pub fn new(
options: sc_transaction_graph::Options,
pool_api: PoolApi,
) -> Self {
Self::with_revalidation_type(options, pool_api, RevalidationType::Full)
}
/// Create new basic transaction pool with provided api and custom
/// revalidation type.
pub fn with_revalidation_type(
options: sc_transaction_graph::Options,
pool_api: PoolApi,
revalidation_type: RevalidationType,
) -> Self {
let api = Arc::new(pool_api);
let cloned_api = api.clone();
BasicPool {
pool: Arc::new(sc_transaction_graph::Pool::new(options, pool_api)),
api: cloned_api,
pool: Arc::new(sc_transaction_graph::Pool::new(options, api)),
revalidation_strategy: Arc::new(Mutex::new(
match revalidation_type {
RevalidationType::Light => RevalidationStrategy::Light(RevalidationStatus::NotScheduled),
RevalidationType::Full => RevalidationStrategy::Always,
}
)),
}
}
/// Gets shared reference to the underlying pool.
pub fn pool(&self) -> &Arc<sc_transaction_graph::Pool<PoolApi>> {
&self.pool
}
#[cfg(test)]
pub fn api(&self) -> &Arc<PoolApi> {
&self.api
}
}
impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
@@ -130,3 +177,169 @@ impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
self.pool.on_broadcasted(propagations)
}
}
#[cfg_attr(test, derive(Debug))]
enum RevalidationStatus<N> {
/// The revalidation has never been completed.
NotScheduled,
/// The revalidation is scheduled.
Scheduled(Option<std::time::Instant>, Option<N>),
/// The revalidation is in progress.
InProgress,
}
enum RevalidationStrategy<N> {
Always,
Light(RevalidationStatus<N>)
}
struct RevalidationAction {
revalidate: bool,
resubmit: bool,
revalidate_amount: Option<usize>,
}
impl<N: Clone + Copy + SimpleArithmetic> RevalidationStrategy<N> {
pub fn clear(&mut self) {
if let Self::Light(status) = self {
status.clear()
}
}
pub fn next(
&mut self,
block: N,
revalidate_time_period: Option<std::time::Duration>,
revalidate_block_period: Option<N>,
) -> RevalidationAction {
match self {
Self::Light(status) => RevalidationAction {
revalidate: status.next_required(
block,
revalidate_time_period,
revalidate_block_period
),
resubmit: false,
revalidate_amount: None,
},
Self::Always => RevalidationAction {
revalidate: true,
resubmit: true,
revalidate_amount: Some(16),
}
}
}
}
impl<N: Clone + Copy + SimpleArithmetic> RevalidationStatus<N> {
/// Called when revalidation is completed.
pub fn clear(&mut self) {
*self = Self::NotScheduled;
}
/// Returns true if revalidation is required.
pub fn next_required(
&mut self,
block: N,
revalidate_time_period: Option<std::time::Duration>,
revalidate_block_period: Option<N>,
) -> bool {
match *self {
Self::NotScheduled => {
*self = Self::Scheduled(
revalidate_time_period.map(|period| Instant::now() + period),
revalidate_block_period.map(|period| block + period),
);
false
},
Self::Scheduled(revalidate_at_time, revalidate_at_block) => {
let is_required = revalidate_at_time.map(|at| Instant::now() >= at).unwrap_or(false)
|| revalidate_at_block.map(|at| block >= at).unwrap_or(false);
if is_required {
*self = Self::InProgress;
}
is_required
},
Self::InProgress => false,
}
}
}
impl<PoolApi, Block> MaintainedTransactionPool for BasicPool<PoolApi, Block>
where
Block: BlockT,
PoolApi: 'static + sc_transaction_graph::ChainApi<Block=Block, Hash=Block::Hash, Error=error::Error>,
{
fn maintain(&self, id: &BlockId<Self::Block>, retracted: &[BlockHash<Self>])
-> Pin<Box<dyn Future<Output=()> + Send>>
{
let id = id.clone();
let pool = self.pool.clone();
let api = self.api.clone();
let block_number = match api.block_id_to_number(&id) {
Ok(Some(number)) => number,
_ => {
log::trace!(target: "txqueue", "Skipping chain event - no number for that block {:?}", id);
return Box::pin(ready(()));
}
};
let next_action = self.revalidation_strategy.lock().next(
block_number,
Some(std::time::Duration::from_secs(60)),
Some(20.into()),
);
let revalidation_strategy = self.revalidation_strategy.clone();
let retracted = retracted.to_vec();
async move {
// We don't query block if we won't prune anything
if !pool.status().is_empty() {
let hashes = api.block_body(&id).await
.unwrap_or_else(|e| {
log::warn!("Prune known transactions: error request {:?}!", e);
None
})
.unwrap_or_default()
.into_iter()
.map(|tx| pool.hash_of(&tx))
.collect::<Vec<_>>();
if let Err(e) = pool.prune_known(&id, &hashes) {
log::error!("Cannot prune known in the pool {:?}!", e);
}
}
if next_action.resubmit {
let mut resubmit_transactions = Vec::new();
for retracted_hash in retracted {
let block_transactions = api.block_body(&BlockId::hash(retracted_hash.clone())).await
.unwrap_or_else(|e| {
log::warn!("Failed to fetch block body {:?}!", e);
None
})
.unwrap_or_default()
.into_iter()
.filter(|tx| tx.is_signed().unwrap_or(true));
resubmit_transactions.extend(block_transactions);
}
if let Err(e) = pool.submit_at(&id, resubmit_transactions, true).await {
log::debug!(target: "txpool",
"[{:?}] Error re-submitting transactions: {:?}", id, e
)
}
}
if next_action.revalidate {
if let Err(e) = pool.revalidate_ready(&id, next_action.revalidate_amount).await {
log::warn!("Revalidate ready failed {:?}", e);
}
}
revalidation_strategy.lock().clear();
}.boxed()
}
}
@@ -1,645 +0,0 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use std::{
marker::{PhantomData, Unpin},
sync::Arc,
time::Instant,
};
use futures::{
Future, FutureExt,
future::{Either, join, ready},
};
use log::{warn, debug, trace};
use parking_lot::Mutex;
use sc_client_api::{
client::BlockBody,
light::{Fetcher, RemoteBodyRequest},
};
use sp_runtime::{
generic::BlockId,
traits::{Block as BlockT, Extrinsic, Header, NumberFor, SimpleArithmetic},
};
use sp_blockchain::HeaderBackend;
use sp_transaction_pool::{TransactionPoolMaintainer, runtime_api::TaggedTransactionQueue};
use sp_api::ProvideRuntimeApi;
use sc_transaction_graph::{self, ChainApi};
/// Basic transaction pool maintainer for full clients.
pub struct FullBasicPoolMaintainer<Client, PoolApi: ChainApi> {
pool: Arc<sc_transaction_graph::Pool<PoolApi>>,
client: Arc<Client>,
}
impl<Client, PoolApi: ChainApi> FullBasicPoolMaintainer<Client, PoolApi> {
/// Create new basic full pool maintainer.
pub fn new(
pool: Arc<sc_transaction_graph::Pool<PoolApi>>,
client: Arc<Client>,
) -> Self {
FullBasicPoolMaintainer { pool, client }
}
}
impl<Block, Client, PoolApi> TransactionPoolMaintainer
for
FullBasicPoolMaintainer<Client, PoolApi>
where
Block: BlockT,
Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + BlockBody<Block> + 'static,
Client::Api: TaggedTransactionQueue<Block>,
PoolApi: ChainApi<Block = Block, Hash = Block::Hash> + 'static,
{
type Block = Block;
type Hash = Block::Hash;
fn maintain(
&self,
id: &BlockId<Block>,
retracted: &[Block::Hash],
) -> Box<dyn Future<Output=()> + Send + Unpin> {
let now = std::time::Instant::now();
let took = move || format!("Took {} ms", now.elapsed().as_millis());
let id = *id;
trace!(target: "txpool", "[{:?}] Starting pool maintainance", id);
// Put transactions from retracted blocks back into the pool.
let client_copy = self.client.clone();
let retracted_transactions = retracted.to_vec().into_iter()
.filter_map(move |hash| client_copy.block_body(&BlockId::hash(hash)).ok().unwrap_or(None))
.flat_map(|block| block.into_iter())
// if signed information is not present, attempt to resubmit anyway.
.filter(|tx| tx.is_signed().unwrap_or(true));
let resubmit_future = self.pool
.submit_at(&id, retracted_transactions, true)
.then(move |resubmit_result| ready(match resubmit_result {
Ok(_) => trace!(target: "txpool",
"[{:?}] Re-submitting retracted done. {}", id, took()
),
Err(e) => debug!(target: "txpool",
"[{:?}] Error re-submitting transactions: {:?}", id, e
),
}));
// Avoid calling into runtime if there is nothing to prune from the pool anyway.
if self.pool.status().is_empty() {
return Box::new(resubmit_future)
}
let block = (self.client.header(id), self.client.block_body(&id));
let prune_future = match block {
(Ok(Some(header)), Ok(Some(extrinsics))) => {
let parent_id = BlockId::hash(*header.parent_hash());
let prune_future = self.pool
.prune(&id, &parent_id, &extrinsics)
.then(move |prune_result| ready(match prune_result {
Ok(_) => trace!(target: "txpool",
"[{:?}] Pruning done. {}", id, took()
),
Err(e) => warn!(target: "txpool",
"[{:?}] Error pruning transactions: {:?}", id, e
),
}));
Either::Left(resubmit_future.then(|_| prune_future))
},
(Ok(_), Ok(_)) => Either::Right(resubmit_future),
err => {
warn!(target: "txpool", "[{:?}] Error reading block: {:?}", id, err);
Either::Right(resubmit_future)
},
};
let revalidate_future = self.pool
.revalidate_ready(&id, Some(16))
.then(move |result| ready(match result {
Ok(_) => debug!(target: "txpool",
"[{:?}] Revalidation done: {}", id, took()
),
Err(e) => warn!(target: "txpool",
"[{:?}] Encountered errors while revalidating transactions: {:?}", id, e
),
}));
Box::new(prune_future.then(|_| revalidate_future))
}
}
/// Basic transaction pool maintainer for light clients.
pub struct LightBasicPoolMaintainer<Block: BlockT, Client, PoolApi: ChainApi, F> {
pool: Arc<sc_transaction_graph::Pool<PoolApi>>,
client: Arc<Client>,
fetcher: Arc<F>,
revalidate_time_period: Option<std::time::Duration>,
revalidate_block_period: Option<NumberFor<Block>>,
revalidation_status: Arc<Mutex<TxPoolRevalidationStatus<NumberFor<Block>>>>,
_phantom: PhantomData<Block>,
}
impl<Block, Client, PoolApi, F> LightBasicPoolMaintainer<Block, Client, PoolApi, F>
where
Block: BlockT,
Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + BlockBody<Block> + 'static,
Client::Api: TaggedTransactionQueue<Block>,
PoolApi: ChainApi<Block = Block, Hash = Block::Hash> + 'static,
F: Fetcher<Block> + 'static,
{
/// Create light pool maintainer with default constants.
///
/// Default constants are: revalidate every 60 seconds or every 20 blocks
/// (whatever happens first).
pub fn with_defaults(
pool: Arc<sc_transaction_graph::Pool<PoolApi>>,
client: Arc<Client>,
fetcher: Arc<F>,
) -> Self {
Self::new(
pool,
client,
fetcher,
Some(std::time::Duration::from_secs(60)),
Some(20.into()),
)
}
/// Create light pool maintainer with passed constants.
pub fn new(
pool: Arc<sc_transaction_graph::Pool<PoolApi>>,
client: Arc<Client>,
fetcher: Arc<F>,
revalidate_time_period: Option<std::time::Duration>,
revalidate_block_period: Option<NumberFor<Block>>,
) -> Self {
Self {
pool,
client,
fetcher,
revalidate_time_period,
revalidate_block_period,
revalidation_status: Arc::new(Mutex::new(TxPoolRevalidationStatus::NotScheduled)),
_phantom: Default::default(),
}
}
/// Returns future that prunes block transactions from the pool.
fn prune(
&self,
id: &BlockId<Block>,
header: &Block::Header,
) -> impl std::future::Future<Output = ()> {
// fetch transactions (possible future optimization: proofs of inclusion) that
// have been included into new block and prune these from the pool
let id = id.clone();
let pool = self.pool.clone();
self.fetcher.remote_body(RemoteBodyRequest {
header: header.clone(),
retry_count: None,
})
.then(move |transactions| ready(
transactions
.map_err(|e| format!("{}", e))
.and_then(|transactions| {
let hashes = transactions
.into_iter()
.map(|tx| pool.hash_of(&tx))
.collect::<Vec<_>>();
pool.prune_known(&id, &hashes)
.map_err(|e| format!("{}", e))
})
))
.then(|r| {
if let Err(e) = r {
warn!("Error pruning known transactions: {}", e)
}
ready(())
})
}
/// Returns future that performs in-pool transations revalidation, if required.
fn revalidate(
&self,
id: &BlockId<Block>,
header: &Block::Header,
) -> impl std::future::Future<Output = ()> {
// to determine whether ready transaction is still valid, we perform periodic revalidaton
// of ready transactions
let is_revalidation_required = self.revalidation_status.lock().is_required(
*header.number(),
self.revalidate_time_period,
self.revalidate_block_period,
);
match is_revalidation_required {
true => {
let revalidation_status = self.revalidation_status.clone();
Either::Left(self.pool
.revalidate_ready(id, None)
.map(|r| r.map_err(|e| warn!("Error revalidating known transactions: {}", e)))
.map(move |_| revalidation_status.lock().clear()))
},
false => Either::Right(ready(())),
}
}
}
impl<Block, Client, PoolApi, F> TransactionPoolMaintainer
for
LightBasicPoolMaintainer<Block, Client, PoolApi, F>
where
Block: BlockT,
Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + BlockBody<Block> + 'static,
Client::Api: TaggedTransactionQueue<Block>,
PoolApi: ChainApi<Block = Block, Hash = Block::Hash> + 'static,
F: Fetcher<Block> + 'static,
{
type Block = Block;
type Hash = Block::Hash;
fn maintain(
&self,
id: &BlockId<Block>,
_retracted: &[Block::Hash],
) -> Box<dyn Future<Output=()> + Send + Unpin> {
// Do nothing if transaction pool is empty.
if self.pool.status().is_empty() {
self.revalidation_status.lock().clear();
return Box::new(ready(()));
}
let header = self.client.header(*id)
.and_then(|h| h.ok_or(sp_blockchain::Error::UnknownBlock(format!("{}", id))));
let header = match header {
Ok(header) => header,
Err(err) => {
println!("Failed to maintain light tx pool: {:?}", err);
return Box::new(ready(()));
}
};
// else prune block transactions from the pool
let prune_future = self.prune(id, &header);
// and then (optionally) revalidate in-pool transactions
let revalidate_future = self.revalidate(id, &header);
let maintain_future = join(
prune_future,
revalidate_future,
).map(|_| ());
Box::new(maintain_future)
}
}
/// The status of transactions revalidation at light tx pool.
#[cfg_attr(test, derive(Debug))]
enum TxPoolRevalidationStatus<N> {
/// The revalidation has never been completed.
NotScheduled,
/// The revalidation is scheduled.
Scheduled(Option<std::time::Instant>, Option<N>),
/// The revalidation is in progress.
InProgress,
}
impl<N: Clone + Copy + SimpleArithmetic> TxPoolRevalidationStatus<N> {
/// Called when revalidation is completed.
pub fn clear(&mut self) {
*self = TxPoolRevalidationStatus::NotScheduled;
}
/// Returns true if revalidation is required.
pub fn is_required(
&mut self,
block: N,
revalidate_time_period: Option<std::time::Duration>,
revalidate_block_period: Option<N>,
) -> bool {
match *self {
TxPoolRevalidationStatus::NotScheduled => {
*self = TxPoolRevalidationStatus::Scheduled(
revalidate_time_period.map(|period| Instant::now() + period),
revalidate_block_period.map(|period| block + period),
);
false
},
TxPoolRevalidationStatus::Scheduled(revalidate_at_time, revalidate_at_block) => {
let is_required = revalidate_at_time.map(|at| Instant::now() >= at).unwrap_or(false)
|| revalidate_at_block.map(|at| block >= at).unwrap_or(false);
if is_required {
*self = TxPoolRevalidationStatus::InProgress;
}
is_required
},
TxPoolRevalidationStatus::InProgress => false,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::executor::block_on;
use codec::Encode;
use substrate_test_runtime_client::{
prelude::*, Client, runtime::{Block, Transfer}, sp_consensus::{BlockOrigin, SelectChain},
LongestChain,
};
use sp_transaction_pool::PoolStatus;
use crate::api::{FullChainApi, LightChainApi};
struct TestSetup<Api: ChainApi> {
client: Arc<Client<Backend>>,
longest_chain: LongestChain<Backend, Block>,
pool: Arc<sc_transaction_graph::Pool<Api>>,
}
impl<Api: ChainApi> TestSetup<Api> {
fn new() -> TestSetup<FullChainApi<Client<Backend>, Block>> {
let (client, longest_chain) = TestClientBuilder::new().build_with_longest_chain();
let client = Arc::new(client);
let pool = Arc::new(
sc_transaction_graph::Pool::new(Default::default(), FullChainApi::new(client.clone())),
);
TestSetup {
client,
longest_chain,
pool,
}
}
fn new_light<F>(fetcher: Arc<F>) -> TestSetup<LightChainApi<Client<Backend>, F, Block>>
where F: Fetcher<Block> + 'static,
{
let (client, longest_chain) = TestClientBuilder::new().build_with_longest_chain();
let client = Arc::new(client);
let pool = Arc::new(
sc_transaction_graph::Pool::new(
Default::default(),
LightChainApi::new(client.clone(), fetcher)
),
);
TestSetup {
client,
longest_chain,
pool,
}
}
}
fn setup() -> TestSetup<FullChainApi<Client<Backend>, Block>> {
TestSetup::<FullChainApi<Client<Backend>, Block>>::new()
}
fn setup_light<F>(fetcher: Arc<F>) -> TestSetup<LightChainApi<Client<Backend>, F, Block>>
where F: Fetcher<Block> + 'static,
{
TestSetup::<LightChainApi<Client<Backend>, F, Block>>::new_light(fetcher)
}
#[test]
fn should_remove_transactions_from_the_full_pool() {
let mut setup = setup();
let transaction = Transfer {
amount: 5,
nonce: 0,
from: AccountKeyring::Alice.into(),
to: Default::default(),
}.into_signed_tx();
let best = setup.longest_chain.best_chain().unwrap();
// store the transaction in the pool
block_on(setup.pool.submit_one(&BlockId::hash(best.hash()), transaction.clone())).unwrap();
// import the block
let mut builder = setup.client.new_block(Default::default()).unwrap();
builder.push(transaction.clone()).unwrap();
let block = builder.build().unwrap().block;
let id = BlockId::hash(block.header().hash());
setup.client.import(BlockOrigin::Own, block).unwrap();
// fire notification - this should clean up the queue
assert_eq!(setup.pool.status().ready, 1);
block_on(FullBasicPoolMaintainer::new(setup.pool.clone(), setup.client.clone()).maintain(&id, &[]));
// then
assert_eq!(setup.pool.status().ready, 0);
assert_eq!(setup.pool.status().future, 0);
}
#[test]
fn should_remove_transactions_from_the_light_pool() {
let transaction = Transfer {
amount: 5,
nonce: 0,
from: AccountKeyring::Alice.into(),
to: Default::default(),
}.into_signed_tx();
let fetcher_transaction = transaction.clone();
let fetcher = Arc::new(substrate_test_runtime_client::new_light_fetcher()
.with_remote_body(Some(Box::new(move |_| Ok(vec![fetcher_transaction.clone()]))))
.with_remote_call(Some(Box::new(move |_| {
let validity: sp_runtime::transaction_validity::TransactionValidity =
Ok(sp_runtime::transaction_validity::ValidTransaction {
priority: 0,
requires: Vec::new(),
provides: vec![vec![42]],
longevity: 0,
propagate: true,
});
Ok(validity.encode())
}))));
let setup = setup_light(fetcher.clone());
let best = setup.longest_chain.best_chain().unwrap();
// store the transaction in the pool
block_on(setup.pool.submit_one(&BlockId::hash(best.hash()), transaction.clone())).unwrap();
// fire notification - this should clean up the queue
assert_eq!(setup.pool.status().ready, 1);
block_on(LightBasicPoolMaintainer::with_defaults(setup.pool.clone(), setup.client.clone(), fetcher).maintain(
&BlockId::Number(0),
&[],
));
// then
assert_eq!(setup.pool.status().ready, 0);
assert_eq!(setup.pool.status().future, 0);
}
#[test]
fn should_schedule_transactions_revalidation_at_light_pool() {
// when revalidation is not scheduled, it became scheduled
let mut status = TxPoolRevalidationStatus::NotScheduled;
assert!(!status.is_required(10u32, None, None));
match status {
TxPoolRevalidationStatus::Scheduled(_, _) => (),
_ => panic!("Unexpected status: {:?}", status),
}
// revalidation required at time
let mut status = TxPoolRevalidationStatus::Scheduled(Some(Instant::now()), None);
assert!(status.is_required(10u32, None, None));
match status {
TxPoolRevalidationStatus::InProgress => (),
_ => panic!("Unexpected status: {:?}", status),
}
// revalidation required at block
let mut status = TxPoolRevalidationStatus::Scheduled(None, Some(10));
assert!(status.is_required(10u32, None, None));
match status {
TxPoolRevalidationStatus::InProgress => (),
_ => panic!("Unexpected status: {:?}", status),
}
}
#[test]
fn should_revalidate_transactions_at_light_pool() {
use std::sync::atomic;
use sp_runtime::transaction_validity::*;
let build_fetcher = || {
let validated = Arc::new(atomic::AtomicBool::new(false));
Arc::new(substrate_test_runtime_client::new_light_fetcher()
.with_remote_body(Some(Box::new(move |_| Ok(vec![]))))
.with_remote_call(Some(Box::new(move |_| {
let is_inserted = validated.swap(true, atomic::Ordering::SeqCst);
let validity: TransactionValidity = if is_inserted {
Err(TransactionValidityError::Invalid(
InvalidTransaction::Custom(0)
))
} else {
Ok(ValidTransaction {
priority: 0,
requires: Vec::new(),
provides: vec![vec![42]],
longevity: 0,
propagate: true,
})
};
Ok(validity.encode())
}))))
};
fn with_fetcher_maintain<F: Fetcher<Block> + 'static>(
fetcher: Arc<F>,
revalidate_time_period: Option<std::time::Duration>,
revalidate_block_period: Option<u64>,
prepare_maintainer: impl Fn(&Mutex<TxPoolRevalidationStatus<u64>>),
) -> PoolStatus {
let setup = setup_light(fetcher.clone());
let best = setup.longest_chain.best_chain().unwrap();
// let's prepare maintainer
let maintainer = LightBasicPoolMaintainer::new(
setup.pool.clone(),
setup.client.clone(),
fetcher,
revalidate_time_period,
revalidate_block_period,
);
prepare_maintainer(&*maintainer.revalidation_status);
// store the transaction in the pool
block_on(setup.pool.submit_one(
&BlockId::hash(best.hash()),
Transfer {
amount: 5,
nonce: 0,
from: AccountKeyring::Alice.into(),
to: Default::default(),
}.into_signed_tx(),
)).unwrap();
// and run maintain procedures
block_on(maintainer.maintain(&BlockId::Number(0), &[]));
setup.pool.status()
}
// when revalidation is never required - nothing happens
let fetcher = build_fetcher();
//let maintainer = DefaultLightTransactionPoolMaintainer::new(client.clone(), fetcher.clone(), None, None);
let status = with_fetcher_maintain(fetcher, None, None, |_revalidation_status| {});
assert_eq!(status.ready, 1);
// when revalidation is scheduled by time - it is performed
let fetcher = build_fetcher();
let status = with_fetcher_maintain(fetcher, None, None, |revalidation_status|
*revalidation_status.lock() = TxPoolRevalidationStatus::Scheduled(Some(Instant::now()), None)
);
assert_eq!(status.ready, 0);
// when revalidation is scheduled by block number - it is performed
let fetcher = build_fetcher();
let status = with_fetcher_maintain(fetcher, None, None, |revalidation_status|
*revalidation_status.lock() = TxPoolRevalidationStatus::Scheduled(None, Some(0))
);
assert_eq!(status.ready, 0);
}
#[test]
fn should_add_reverted_transactions_to_the_pool() {
let mut setup = setup();
let transaction = Transfer {
amount: 5,
nonce: 0,
from: AccountKeyring::Alice.into(),
to: Default::default(),
}.into_signed_tx();
let best = setup.longest_chain.best_chain().unwrap();
// store the transaction in the pool
block_on(setup.pool.submit_one(&BlockId::hash(best.hash()), transaction.clone())).unwrap();
// import the block
let mut builder = setup.client.new_block(Default::default()).unwrap();
builder.push(transaction.clone()).unwrap();
let block = builder.build().unwrap().block;
let block1_hash = block.header().hash();
let id = BlockId::hash(block1_hash.clone());
setup.client.import(BlockOrigin::Own, block).unwrap();
// fire notification - this should clean up the queue
assert_eq!(setup.pool.status().ready, 1);
block_on(FullBasicPoolMaintainer::new(setup.pool.clone(), setup.client.clone()).maintain(&id, &[]));
// then
assert_eq!(setup.pool.status().ready, 0);
assert_eq!(setup.pool.status().future, 0);
// import second block
let builder = setup.client.new_block_at(
&BlockId::hash(best.hash()),
Default::default(),
false,
).unwrap();
let block = builder.build().unwrap().block;
let id = BlockId::hash(block.header().hash());
setup.client.import(BlockOrigin::Own, block).unwrap();
// fire notification - this should add the transaction back to the pool.
block_on(FullBasicPoolMaintainer::new(setup.pool.clone(), setup.client.clone()).maintain(&id, &[block1_hash]));
// then
assert_eq!(setup.pool.status().ready, 1);
assert_eq!(setup.pool.status().future, 0);
}
}
+105 -18
View File
@@ -14,29 +14,53 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use super::*;
use crate::{BasicPool, MaintainedTransactionPool};
use codec::Encode;
use futures::executor::block_on;
use sc_transaction_graph::{self, Pool};
use substrate_test_runtime_client::{runtime::{AccountId, Block, Hash, Index, Extrinsic, Transfer}, AccountKeyring::{self, *}};
use parking_lot::RwLock;
use sc_transaction_graph::{self, ExHash, Pool};
use sp_runtime::{
generic::{self, BlockId},
traits::{Hash as HashT, BlakeTwo256},
transaction_validity::{TransactionValidity, ValidTransaction},
traits::{BlakeTwo256, Hash as HashT},
transaction_validity::{TransactionValidity, ValidTransaction, TransactionValidityError, InvalidTransaction},
};
use std::collections::HashSet;
use substrate_test_runtime_client::{
runtime::{AccountId, Block, BlockNumber, Extrinsic, Hash, Header, Index, Transfer},
AccountKeyring::{self, *},
};
struct TestApi {
pub modifier: Box<dyn Fn(&mut ValidTransaction) + Send + Sync>,
pub modifier: RwLock<Box<dyn Fn(&mut ValidTransaction) + Send + Sync>>,
pub chain_block_by_number: RwLock<HashMap<BlockNumber, Vec<Extrinsic>>>,
pub chain_headers_by_number: RwLock<HashMap<BlockNumber, Header>>,
pub invalid_hashes: RwLock<HashSet<ExHash<Self>>>,
pub validation_requests: RwLock<Vec<Extrinsic>>,
}
impl TestApi {
fn default() -> Self {
TestApi {
modifier: Box::new(|_| {}),
modifier: RwLock::new(Box::new(|_| {})),
chain_block_by_number: RwLock::new(HashMap::new()),
invalid_hashes: RwLock::new(HashSet::new()),
chain_headers_by_number: RwLock::new(HashMap::new()),
validation_requests: RwLock::new(Default::default()),
}
}
fn push_block(&self, block_number: BlockNumber, xts: Vec<Extrinsic>) {
self.chain_block_by_number.write().insert(block_number, xts);
self.chain_headers_by_number.write().insert(block_number, Header {
number: block_number,
digest: Default::default(),
extrinsics_root: Default::default(),
parent_hash: Default::default(),
state_root: Default::default(),
});
}
}
impl sc_transaction_graph::ChainApi for TestApi {
@@ -44,12 +68,16 @@ impl sc_transaction_graph::ChainApi for TestApi {
type Hash = Hash;
type Error = error::Error;
type ValidationFuture = futures::future::Ready<error::Result<TransactionValidity>>;
type BodyFuture = futures::future::Ready<error::Result<Option<Vec<Extrinsic>>>>;
fn validate_transaction(
&self,
at: &BlockId<Self::Block>,
uxt: sc_transaction_graph::ExtrinsicFor<Self>,
) -> Self::ValidationFuture {
self.validation_requests.write().push(uxt.clone());
let expected = index(at);
let requires = if expected == uxt.transfer().nonce {
vec![]
@@ -58,6 +86,12 @@ impl sc_transaction_graph::ChainApi for TestApi {
};
let provides = vec![vec![uxt.transfer().nonce as u8]];
if self.invalid_hashes.read().contains(&self.hash_and_length(&uxt).0) {
return futures::future::ready(Ok(
Err(TransactionValidityError::Invalid(InvalidTransaction::Custom(0)))
))
}
let mut validity = ValidTransaction {
priority: 1,
requires,
@@ -66,29 +100,43 @@ impl sc_transaction_graph::ChainApi for TestApi {
propagate: true,
};
(self.modifier)(&mut validity);
(self.modifier.read())(&mut validity);
futures::future::ready(Ok(
Ok(validity)
))
futures::future::ready(Ok(Ok(validity)))
}
fn block_id_to_number(&self, at: &BlockId<Self::Block>) -> error::Result<Option<sc_transaction_graph::NumberFor<Self>>> {
fn block_id_to_number(
&self,
at: &BlockId<Self::Block>,
) -> error::Result<Option<sc_transaction_graph::NumberFor<Self>>> {
Ok(Some(number_of(at)))
}
fn block_id_to_hash(&self, at: &BlockId<Self::Block>) -> error::Result<Option<sc_transaction_graph::BlockHash<Self>>> {
fn block_id_to_hash(
&self,
at: &BlockId<Self::Block>,
) -> error::Result<Option<sc_transaction_graph::BlockHash<Self>>> {
Ok(match at {
generic::BlockId::Hash(x) => Some(x.clone()),
_ => Some(Default::default()),
})
}
fn hash_and_length(&self, ex: &sc_transaction_graph::ExtrinsicFor<Self>) -> (Self::Hash, usize) {
fn hash_and_length(
&self,
ex: &sc_transaction_graph::ExtrinsicFor<Self>,
) -> (Self::Hash, usize) {
let encoded = ex.encode();
(BlakeTwo256::hash(&encoded), encoded.len())
}
fn block_body(&self, id: &BlockId<Self::Block>) -> Self::BodyFuture {
futures::future::ready(Ok(if let BlockId::Number(num) = id {
self.chain_block_by_number.read().get(num).cloned()
} else {
None
}))
}
}
fn index(at: &BlockId<Block>) -> u64 {
@@ -114,7 +162,11 @@ fn uxt(who: AccountKeyring, nonce: Index) -> Extrinsic {
}
fn pool() -> Pool<TestApi> {
Pool::new(Default::default(), TestApi::default())
Pool::new(Default::default(), TestApi::default().into())
}
fn maintained_pool() -> BasicPool<TestApi, Block> {
BasicPool::new(Default::default(), TestApi::default())
}
#[test]
@@ -192,11 +244,11 @@ fn should_ban_invalid_transactions() {
#[test]
fn should_correctly_prune_transactions_providing_more_than_one_tag() {
let mut api = TestApi::default();
api.modifier = Box::new(|v: &mut ValidTransaction| {
let api = TestApi::default();
*api.modifier.write() = Box::new(|v: &mut ValidTransaction| {
v.provides.push(vec![155]);
});
let pool = Pool::new(Default::default(), api);
let pool = Pool::new(Default::default(), Arc::new(api));
let xt = uxt(Alice, 209);
block_on(pool.submit_one(&BlockId::number(0), xt.clone())).expect("1. Imported");
assert_eq!(pool.status().ready, 1);
@@ -220,3 +272,38 @@ fn should_correctly_prune_transactions_providing_more_than_one_tag() {
assert_eq!(pool.status().ready, 0);
assert_eq!(pool.status().future, 2);
}
#[test]
fn should_prune_old_during_maintenance() {
let xt = uxt(Alice, 209);
let pool = maintained_pool();
block_on(pool.submit_one(&BlockId::number(0), xt.clone())).expect("1. Imported");
assert_eq!(pool.status().ready, 1);
pool.api.push_block(1, vec![xt.clone()]);
block_on(pool.maintain(&BlockId::number(1), &[]));
assert_eq!(pool.status().ready, 0);
}
#[test]
fn should_revalidate_during_maintenance() {
let xt1 = uxt(Alice, 209);
let xt2 = uxt(Alice, 210);
let pool = maintained_pool();
block_on(pool.submit_one(&BlockId::number(0), xt1.clone())).expect("1. Imported");
block_on(pool.submit_one(&BlockId::number(0), xt2.clone())).expect("2. Imported");
assert_eq!(pool.status().ready, 2);
assert_eq!(pool.api.validation_requests.read().len(), 2);
pool.api.push_block(1, vec![xt1.clone()]);
block_on(pool.maintain(&BlockId::number(1), &[]));
assert_eq!(pool.status().ready, 1);
// test that pool revalidated transaction that left ready and not included in the block
assert_eq!(pool.api.validation_requests.read().len(), 3);
}
@@ -20,6 +20,7 @@ use std::{
collections::HashMap,
hash::Hash,
sync::Arc,
pin::Pin,
};
use futures::{
Future, Stream,
@@ -225,6 +226,13 @@ pub trait TransactionPool: Send + Sync {
fn hash_of(&self, xt: &TransactionFor<Self>) -> TxHash<Self>;
}
/// Trait for transaction pool maintenance.
pub trait MaintainedTransactionPool : TransactionPool {
/// Perform maintenance
fn maintain(&self, block: &BlockId<Self::Block>, retracted: &[BlockHash<Self>])
-> Pin<Box<dyn Future<Output=()> + Send>>;
}
/// An abstraction for transaction pool.
///
/// This trait is used by offchain calls to be able to submit transactions.
@@ -264,109 +272,4 @@ impl<TPool: TransactionPool> OffchainSubmitTransaction<TPool::Block> for TPool {
e
))
}
}
/// Transaction pool maintainer interface.
pub trait TransactionPoolMaintainer: Send + Sync {
/// Block type.
type Block: BlockT;
/// Transaction Hash type.
type Hash: Hash + Eq + Member + Serialize;
/// Returns a future that performs maintenance procedures on the pool when
/// with given hash is imported.
fn maintain(
&self,
id: &BlockId<Self::Block>,
retracted: &[Self::Hash],
) -> Box<dyn Future<Output=()> + Send + Unpin>;
}
/// Maintainable pool implementation.
pub struct MaintainableTransactionPool<Pool, Maintainer> {
pool: Pool,
maintainer: Maintainer,
}
impl<Pool, Maintainer> MaintainableTransactionPool<Pool, Maintainer> {
/// Create new maintainable pool using underlying pool and maintainer.
pub fn new(pool: Pool, maintainer: Maintainer) -> Self {
MaintainableTransactionPool { pool, maintainer }
}
}
impl<Pool, Maintainer> TransactionPool for MaintainableTransactionPool<Pool, Maintainer>
where
Pool: TransactionPool,
Maintainer: Send + Sync,
{
type Block = Pool::Block;
type Hash = Pool::Hash;
type InPoolTransaction = Pool::InPoolTransaction;
type Error = Pool::Error;
fn submit_at(
&self,
at: &BlockId<Self::Block>,
xts: impl IntoIterator<Item=TransactionFor<Self>> + 'static,
) -> Box<dyn Future<Output=Result<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error>> + Send + Unpin> {
self.pool.submit_at(at, xts)
}
fn submit_one(
&self,
at: &BlockId<Self::Block>,
xt: TransactionFor<Self>,
) -> Box<dyn Future<Output=Result<TxHash<Self>, Self::Error>> + Send + Unpin> {
self.pool.submit_one(at, xt)
}
fn submit_and_watch(
&self,
at: &BlockId<Self::Block>,
xt: TransactionFor<Self>,
) -> Box<dyn Future<Output=Result<Box<TransactionStatusStreamFor<Self>>, Self::Error>> + Send + Unpin> {
self.pool.submit_and_watch(at, xt)
}
fn remove_invalid(&self, hashes: &[TxHash<Self>]) -> Vec<Arc<Self::InPoolTransaction>> {
self.pool.remove_invalid(hashes)
}
fn status(&self) -> PoolStatus {
self.pool.status()
}
fn ready(&self) -> Box<dyn Iterator<Item=Arc<Self::InPoolTransaction>>> {
self.pool.ready()
}
fn import_notification_stream(&self) -> ImportNotificationStream {
self.pool.import_notification_stream()
}
fn hash_of(&self, xt: &TransactionFor<Self>) -> TxHash<Self> {
self.pool.hash_of(xt)
}
fn on_broadcasted(&self, propagations: HashMap<TxHash<Self>, Vec<String>>) {
self.pool.on_broadcasted(propagations)
}
}
impl<Pool, Maintainer> TransactionPoolMaintainer for MaintainableTransactionPool<Pool, Maintainer>
where
Pool: Send + Sync,
Maintainer: TransactionPoolMaintainer
{
type Block = Maintainer::Block;
type Hash = Maintainer::Hash;
fn maintain(
&self,
id: &BlockId<Self::Block>,
retracted: &[Self::Hash],
) -> Box<dyn Future<Output=()> + Send + Unpin> {
self.maintainer.maintain(id, retracted)
}
}
}