BlockId removal: tx-pool refactor (#1678)

It changes following APIs:
- trait `ChainApi`
-- `validate_transaction`

- trait `TransactionPool` 
--`submit_at`
--`submit_one`
--`submit_and_watch`

and some implementation details, in particular:
- impl `Pool` 
--`submit_at`
--`resubmit_at`
--`submit_one`
--`submit_and_watch`
--`prune_known`
--`prune`
--`prune_tags`
--`resolve_block_number`
--`verify`
--`verify_one`

- revalidation queue

All tests are also adjusted.

---------

Co-authored-by: command-bot <>
Co-authored-by: Bastian Köcher <git@kchr.de>
This commit is contained in:
Michal Kucharczyk
2023-09-27 11:58:39 +02:00
committed by GitHub
parent a846b74604
commit ab3a3bc278
20 changed files with 609 additions and 460 deletions
+11 -16
View File
@@ -133,13 +133,12 @@ where
fn validate_transaction(
&self,
at: &BlockId<Self::Block>,
at: <Self::Block as BlockT>::Hash,
source: TransactionSource,
uxt: graph::ExtrinsicFor<Self>,
) -> Self::ValidationFuture {
let (tx, rx) = oneshot::channel();
let client = self.client.clone();
let at = *at;
let validation_pool = self.validation_pool.clone();
let metrics = self.metrics.clone();
@@ -151,7 +150,7 @@ where
.await
.send(
async move {
let res = validate_transaction_blocking(&*client, &at, source, uxt);
let res = validate_transaction_blocking(&*client, at, source, uxt);
let _ = tx.send(res);
metrics.report(|m| m.validations_finished.inc());
}
@@ -209,7 +208,7 @@ where
/// This method will call into the runtime to perform the validation.
fn validate_transaction_blocking<Client, Block>(
client: &Client,
at: &BlockId<Block>,
at: Block::Hash,
source: TransactionSource,
uxt: graph::ExtrinsicFor<FullChainApi<Client, Block>>,
) -> error::Result<TransactionValidity>
@@ -225,14 +224,10 @@ where
{
sp_tracing::within_span!(sp_tracing::Level::TRACE, "validate_transaction";
{
let block_hash = client.to_hash(at)
.map_err(|e| Error::RuntimeApi(e.to_string()))?
.ok_or_else(|| Error::RuntimeApi(format!("Could not get hash for block `{:?}`.", at)))?;
let runtime_api = client.runtime_api();
let api_version = sp_tracing::within_span! { sp_tracing::Level::TRACE, "check_version";
runtime_api
.api_version::<dyn TaggedTransactionQueue<Block>>(block_hash)
.api_version::<dyn TaggedTransactionQueue<Block>>(at)
.map_err(|e| Error::RuntimeApi(e.to_string()))?
.ok_or_else(|| Error::RuntimeApi(
format!("Could not find `TaggedTransactionQueue` api for block `{:?}`.", at)
@@ -245,31 +240,31 @@ where
sp_tracing::Level::TRACE, "runtime::validate_transaction";
{
if api_version >= 3 {
runtime_api.validate_transaction(block_hash, source, uxt, block_hash)
runtime_api.validate_transaction(at, source, uxt, at)
.map_err(|e| Error::RuntimeApi(e.to_string()))
} else {
let block_number = client.to_number(at)
let block_number = client.to_number(&BlockId::Hash(at))
.map_err(|e| Error::RuntimeApi(e.to_string()))?
.ok_or_else(||
Error::RuntimeApi(format!("Could not get number for block `{:?}`.", at))
)?;
// The old versions require us to call `initialize_block` before.
runtime_api.initialize_block(block_hash, &sp_runtime::traits::Header::new(
runtime_api.initialize_block(at, &sp_runtime::traits::Header::new(
block_number + sp_runtime::traits::One::one(),
Default::default(),
Default::default(),
block_hash,
at,
Default::default()),
).map_err(|e| Error::RuntimeApi(e.to_string()))?;
if api_version == 2 {
#[allow(deprecated)] // old validate_transaction
runtime_api.validate_transaction_before_version_3(block_hash, source, uxt)
runtime_api.validate_transaction_before_version_3(at, source, uxt)
.map_err(|e| Error::RuntimeApi(e.to_string()))
} else {
#[allow(deprecated)] // old validate_transaction
runtime_api.validate_transaction_before_version_2(block_hash, uxt)
runtime_api.validate_transaction_before_version_2(at, uxt)
.map_err(|e| Error::RuntimeApi(e.to_string()))
}
}
@@ -294,7 +289,7 @@ where
/// the runtime locally.
pub fn validate_transaction_blocking(
&self,
at: &BlockId<Block>,
at: Block::Hash,
source: TransactionSource,
uxt: graph::ExtrinsicFor<Self>,
) -> error::Result<TransactionValidity> {
@@ -71,7 +71,7 @@ pub trait ChainApi: Send + Sync {
/// Verify extrinsic at given block.
fn validate_transaction(
&self,
at: &BlockId<Self::Block>,
at: <Self::Block as BlockT>::Hash,
source: TransactionSource,
uxt: ExtrinsicFor<Self>,
) -> Self::ValidationFuture;
@@ -154,7 +154,7 @@ impl<B: ChainApi> Pool<B> {
/// Imports a bunch of unverified extrinsics to the pool
pub async fn submit_at(
&self,
at: &BlockId<B::Block>,
at: <B::Block as BlockT>::Hash,
source: TransactionSource,
xts: impl IntoIterator<Item = ExtrinsicFor<B>>,
) -> Result<Vec<Result<ExtrinsicHash<B>, B::Error>>, B::Error> {
@@ -168,7 +168,7 @@ impl<B: ChainApi> Pool<B> {
/// This does not check if a transaction is banned, before we verify it again.
pub async fn resubmit_at(
&self,
at: &BlockId<B::Block>,
at: <B::Block as BlockT>::Hash,
source: TransactionSource,
xts: impl IntoIterator<Item = ExtrinsicFor<B>>,
) -> Result<Vec<Result<ExtrinsicHash<B>, B::Error>>, B::Error> {
@@ -180,7 +180,7 @@ impl<B: ChainApi> Pool<B> {
/// Imports one unverified extrinsic to the pool
pub async fn submit_one(
&self,
at: &BlockId<B::Block>,
at: <B::Block as BlockT>::Hash,
source: TransactionSource,
xt: ExtrinsicFor<B>,
) -> Result<ExtrinsicHash<B>, B::Error> {
@@ -191,11 +191,11 @@ impl<B: ChainApi> Pool<B> {
/// Import a single extrinsic and starts to watch its progress in the pool.
pub async fn submit_and_watch(
&self,
at: &BlockId<B::Block>,
at: <B::Block as BlockT>::Hash,
source: TransactionSource,
xt: ExtrinsicFor<B>,
) -> Result<Watcher<ExtrinsicHash<B>, ExtrinsicHash<B>>, B::Error> {
let block_number = self.resolve_block_number(at)?;
let block_number = self.resolve_block_number(&BlockId::Hash(at))?;
let (_, tx) = self
.verify_one(at, block_number, source, xt, CheckBannedBeforeVerify::Yes)
.await;
@@ -246,8 +246,8 @@ impl<B: ChainApi> Pool<B> {
/// their provided tags from there. Otherwise we query the runtime at the `parent` block.
pub async fn prune(
&self,
at: &BlockId<B::Block>,
parent: &BlockId<B::Block>,
at: <B::Block as BlockT>::Hash,
parent: <B::Block as BlockT>::Hash,
extrinsics: &[ExtrinsicFor<B>],
) -> Result<(), B::Error> {
log::debug!(
@@ -324,7 +324,7 @@ impl<B: ChainApi> Pool<B> {
/// prevent importing them in the (near) future.
pub async fn prune_tags(
&self,
at: &BlockId<B::Block>,
at: <B::Block as BlockT>::Hash,
tags: impl IntoIterator<Item = Tag>,
known_imported_hashes: impl IntoIterator<Item = ExtrinsicHash<B>> + Clone,
) -> Result<(), B::Error> {
@@ -351,7 +351,7 @@ impl<B: ChainApi> Pool<B> {
// And finally - submit reverified transactions back to the pool
self.validated_pool.resubmit_pruned(
at,
&BlockId::Hash(at),
known_imported_hashes,
pruned_hashes,
reverified_transactions.into_values().collect(),
@@ -373,12 +373,12 @@ impl<B: ChainApi> Pool<B> {
/// Returns future that validates a bunch of transactions at given block.
async fn verify(
&self,
at: &BlockId<B::Block>,
at: <B::Block as BlockT>::Hash,
xts: impl IntoIterator<Item = (TransactionSource, ExtrinsicFor<B>)>,
check: CheckBannedBeforeVerify,
) -> Result<HashMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>>, B::Error> {
// we need a block number to compute tx validity
let block_number = self.resolve_block_number(at)?;
let block_number = self.resolve_block_number(&BlockId::Hash(at))?;
let res = futures::future::join_all(
xts.into_iter()
@@ -394,7 +394,7 @@ impl<B: ChainApi> Pool<B> {
/// Returns future that validates single transaction at given block.
async fn verify_one(
&self,
block_id: &BlockId<B::Block>,
block_hash: <B::Block as BlockT>::Hash,
block_number: NumberFor<B>,
source: TransactionSource,
xt: ExtrinsicFor<B>,
@@ -410,7 +410,7 @@ impl<B: ChainApi> Pool<B> {
let validation_result = self
.validated_pool
.api()
.validate_transaction(block_id, source, xt.clone())
.validate_transaction(block_hash, source, xt.clone())
.await;
let status = match validation_result {
@@ -458,6 +458,7 @@ mod tests {
use super::{super::base_pool::Limit, *};
use crate::tests::{pool, uxt, TestApi, INVALID_NONCE};
use assert_matches::assert_matches;
use codec::Encode;
use futures::executor::block_on;
use parking_lot::Mutex;
use sc_transaction_pool_api::TransactionStatus;
@@ -471,11 +472,11 @@ mod tests {
#[test]
fn should_validate_and_import_transaction() {
// given
let pool = pool();
let (pool, api) = pool();
// when
let hash = block_on(pool.submit_one(
&BlockId::Number(0),
api.expect_hash_from_number(0),
SOURCE,
uxt(Transfer {
from: Alice.into(),
@@ -493,7 +494,7 @@ mod tests {
#[test]
fn should_reject_if_temporarily_banned() {
// given
let pool = pool();
let (pool, api) = pool();
let uxt = uxt(Transfer {
from: Alice.into(),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
@@ -503,7 +504,7 @@ mod tests {
// when
pool.validated_pool.ban(&Instant::now(), vec![pool.hash_of(&uxt)]);
let res = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt));
let res = block_on(pool.submit_one(api.expect_hash_from_number(0), SOURCE, uxt));
assert_eq!(pool.validated_pool().status().ready, 0);
assert_eq!(pool.validated_pool().status().future, 0);
@@ -514,18 +515,19 @@ mod tests {
#[test]
fn should_reject_unactionable_transactions() {
// given
let api = Arc::new(TestApi::default());
let pool = Pool::new(
Default::default(),
// the node does not author blocks
false.into(),
TestApi::default().into(),
api.clone(),
);
// after validation `IncludeData` will be set to non-propagable (validate_transaction mock)
let uxt = ExtrinsicBuilder::new_include_data(vec![42]).build();
// when
let res = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt));
let res = block_on(pool.submit_one(api.expect_hash_from_number(0), SOURCE, uxt));
// then
assert_matches!(res.unwrap_err(), error::Error::Unactionable);
@@ -535,12 +537,13 @@ mod tests {
fn should_notify_about_pool_events() {
let (stream, hash0, hash1) = {
// given
let pool = pool();
let (pool, api) = pool();
let hash_of_block0 = api.expect_hash_from_number(0);
let stream = pool.validated_pool().import_notification_stream();
// when
let hash0 = block_on(pool.submit_one(
&BlockId::Number(0),
hash_of_block0,
SOURCE,
uxt(Transfer {
from: Alice.into(),
@@ -551,7 +554,7 @@ mod tests {
))
.unwrap();
let hash1 = block_on(pool.submit_one(
&BlockId::Number(0),
hash_of_block0,
SOURCE,
uxt(Transfer {
from: Alice.into(),
@@ -563,7 +566,7 @@ mod tests {
.unwrap();
// future doesn't count
let _hash = block_on(pool.submit_one(
&BlockId::Number(0),
hash_of_block0,
SOURCE,
uxt(Transfer {
from: Alice.into(),
@@ -590,9 +593,10 @@ mod tests {
#[test]
fn should_clear_stale_transactions() {
// given
let pool = pool();
let (pool, api) = pool();
let hash_of_block0 = api.expect_hash_from_number(0);
let hash1 = block_on(pool.submit_one(
&BlockId::Number(0),
hash_of_block0,
SOURCE,
uxt(Transfer {
from: Alice.into(),
@@ -603,7 +607,7 @@ mod tests {
))
.unwrap();
let hash2 = block_on(pool.submit_one(
&BlockId::Number(0),
hash_of_block0,
SOURCE,
uxt(Transfer {
from: Alice.into(),
@@ -614,7 +618,7 @@ mod tests {
))
.unwrap();
let hash3 = block_on(pool.submit_one(
&BlockId::Number(0),
hash_of_block0,
SOURCE,
uxt(Transfer {
from: Alice.into(),
@@ -641,9 +645,9 @@ mod tests {
#[test]
fn should_ban_mined_transactions() {
// given
let pool = pool();
let (pool, api) = pool();
let hash1 = block_on(pool.submit_one(
&BlockId::Number(0),
api.expect_hash_from_number(0),
SOURCE,
uxt(Transfer {
from: Alice.into(),
@@ -655,12 +659,12 @@ mod tests {
.unwrap();
// when
block_on(pool.prune_tags(&BlockId::Number(1), vec![vec![0]], vec![hash1])).unwrap();
block_on(pool.prune_tags(api.expect_hash_from_number(1), vec![vec![0]], vec![hash1]))
.unwrap();
// then
assert!(pool.validated_pool.is_banned(&hash1));
}
use codec::Encode;
#[test]
fn should_limit_futures() {
@@ -678,14 +682,15 @@ mod tests {
let options = Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
let pool = Pool::new(options, true.into(), TestApi::default().into());
let api = Arc::new(TestApi::default());
let pool = Pool::new(options, true.into(), api.clone());
let hash1 = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, xt)).unwrap();
let hash1 = block_on(pool.submit_one(api.expect_hash_from_number(0), SOURCE, xt)).unwrap();
assert_eq!(pool.validated_pool().status().future, 1);
// when
let hash2 = block_on(pool.submit_one(
&BlockId::Number(0),
api.expect_hash_from_number(0),
SOURCE,
uxt(Transfer {
from: Bob.into(),
@@ -709,11 +714,12 @@ mod tests {
let options = Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
let pool = Pool::new(options, true.into(), TestApi::default().into());
let api = Arc::new(TestApi::default());
let pool = Pool::new(options, true.into(), api.clone());
// when
block_on(pool.submit_one(
&BlockId::Number(0),
api.expect_hash_from_number(0),
SOURCE,
uxt(Transfer {
from: Alice.into(),
@@ -732,11 +738,11 @@ mod tests {
#[test]
fn should_reject_transactions_with_no_provides() {
// given
let pool = pool();
let (pool, api) = pool();
// when
let err = block_on(pool.submit_one(
&BlockId::Number(0),
api.expect_hash_from_number(0),
SOURCE,
uxt(Transfer {
from: Alice.into(),
@@ -759,9 +765,9 @@ mod tests {
#[test]
fn should_trigger_ready_and_finalized() {
// given
let pool = pool();
let (pool, api) = pool();
let watcher = block_on(pool.submit_and_watch(
&BlockId::Number(0),
api.expect_hash_from_number(0),
SOURCE,
uxt(Transfer {
from: Alice.into(),
@@ -774,26 +780,25 @@ mod tests {
assert_eq!(pool.validated_pool().status().ready, 1);
assert_eq!(pool.validated_pool().status().future, 0);
let hash_of_block2 = api.expect_hash_from_number(2);
// when
block_on(pool.prune_tags(&BlockId::Number(2), vec![vec![0u8]], vec![])).unwrap();
block_on(pool.prune_tags(hash_of_block2, vec![vec![0u8]], vec![])).unwrap();
assert_eq!(pool.validated_pool().status().ready, 0);
assert_eq!(pool.validated_pool().status().future, 0);
// then
let mut stream = futures::executor::block_on_stream(watcher.into_stream());
assert_eq!(stream.next(), Some(TransactionStatus::Ready));
assert_eq!(
stream.next(),
Some(TransactionStatus::InBlock((H256::from_low_u64_be(2).into(), 0))),
);
assert_eq!(stream.next(), Some(TransactionStatus::InBlock((hash_of_block2.into(), 0))),);
}
#[test]
fn should_trigger_ready_and_finalized_when_pruning_via_hash() {
// given
let pool = pool();
let (pool, api) = pool();
let watcher = block_on(pool.submit_and_watch(
&BlockId::Number(0),
api.expect_hash_from_number(0),
SOURCE,
uxt(Transfer {
from: Alice.into(),
@@ -806,8 +811,10 @@ mod tests {
assert_eq!(pool.validated_pool().status().ready, 1);
assert_eq!(pool.validated_pool().status().future, 0);
let hash_of_block2 = api.expect_hash_from_number(2);
// when
block_on(pool.prune_tags(&BlockId::Number(2), vec![vec![0u8]], vec![*watcher.hash()]))
block_on(pool.prune_tags(hash_of_block2, vec![vec![0u8]], vec![*watcher.hash()]))
.unwrap();
assert_eq!(pool.validated_pool().status().ready, 0);
assert_eq!(pool.validated_pool().status().future, 0);
@@ -815,18 +822,17 @@ mod tests {
// then
let mut stream = futures::executor::block_on_stream(watcher.into_stream());
assert_eq!(stream.next(), Some(TransactionStatus::Ready));
assert_eq!(
stream.next(),
Some(TransactionStatus::InBlock((H256::from_low_u64_be(2).into(), 0))),
);
assert_eq!(stream.next(), Some(TransactionStatus::InBlock((hash_of_block2.into(), 0))),);
}
#[test]
fn should_trigger_future_and_ready_after_promoted() {
// given
let pool = pool();
let (pool, api) = pool();
let hash_of_block0 = api.expect_hash_from_number(0);
let watcher = block_on(pool.submit_and_watch(
&BlockId::Number(0),
hash_of_block0,
SOURCE,
uxt(Transfer {
from: Alice.into(),
@@ -841,7 +847,7 @@ mod tests {
// when
block_on(pool.submit_one(
&BlockId::Number(0),
hash_of_block0,
SOURCE,
uxt(Transfer {
from: Alice.into(),
@@ -862,7 +868,7 @@ mod tests {
#[test]
fn should_trigger_invalid_and_ban() {
// given
let pool = pool();
let (pool, api) = pool();
let uxt = uxt(Transfer {
from: Alice.into(),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
@@ -870,7 +876,8 @@ mod tests {
nonce: 0,
});
let watcher =
block_on(pool.submit_and_watch(&BlockId::Number(0), SOURCE, uxt)).unwrap();
block_on(pool.submit_and_watch(api.expect_hash_from_number(0), SOURCE, uxt))
.unwrap();
assert_eq!(pool.validated_pool().status().ready, 1);
// when
@@ -886,7 +893,7 @@ mod tests {
#[test]
fn should_trigger_broadcasted() {
// given
let pool = pool();
let (pool, api) = pool();
let uxt = uxt(Transfer {
from: Alice.into(),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
@@ -894,7 +901,8 @@ mod tests {
nonce: 0,
});
let watcher =
block_on(pool.submit_and_watch(&BlockId::Number(0), SOURCE, uxt)).unwrap();
block_on(pool.submit_and_watch(api.expect_hash_from_number(0), SOURCE, uxt))
.unwrap();
assert_eq!(pool.validated_pool().status().ready, 1);
// when
@@ -916,7 +924,8 @@ mod tests {
let options =
Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
let pool = Pool::new(options, true.into(), TestApi::default().into());
let api = Arc::new(TestApi::default());
let pool = Pool::new(options, true.into(), api.clone());
let xt = uxt(Transfer {
from: Alice.into(),
@@ -924,7 +933,9 @@ mod tests {
amount: 5,
nonce: 0,
});
let watcher = block_on(pool.submit_and_watch(&BlockId::Number(0), SOURCE, xt)).unwrap();
let watcher =
block_on(pool.submit_and_watch(api.expect_hash_from_number(0), SOURCE, xt))
.unwrap();
assert_eq!(pool.validated_pool().status().ready, 1);
// when
@@ -934,7 +945,7 @@ mod tests {
amount: 4,
nonce: 1,
});
block_on(pool.submit_one(&BlockId::Number(1), SOURCE, xt)).unwrap();
block_on(pool.submit_one(api.expect_hash_from_number(1), SOURCE, xt)).unwrap();
assert_eq!(pool.validated_pool().status().ready, 1);
// then
@@ -951,12 +962,13 @@ mod tests {
let options =
Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
let pool = Pool::new(options, true.into(), TestApi::default().into());
let api = Arc::new(TestApi::default());
let pool = Pool::new(options, true.into(), api.clone());
// after validation `IncludeData` will have priority set to 9001
// (validate_transaction mock)
let xt = ExtrinsicBuilder::new_include_data(Vec::new()).build();
block_on(pool.submit_one(&BlockId::Number(0), SOURCE, xt)).unwrap();
block_on(pool.submit_one(api.expect_hash_from_number(0), SOURCE, xt)).unwrap();
assert_eq!(pool.validated_pool().status().ready, 1);
// then
@@ -968,7 +980,7 @@ mod tests {
amount: 4,
nonce: 1,
});
let result = block_on(pool.submit_one(&BlockId::Number(1), SOURCE, xt));
let result = block_on(pool.submit_one(api.expect_hash_from_number(1), SOURCE, xt));
assert!(matches!(
result,
Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped)
@@ -980,12 +992,15 @@ mod tests {
let options =
Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
let pool = Pool::new(options, true.into(), TestApi::default().into());
let api = Arc::new(TestApi::default());
let pool = Pool::new(options, true.into(), api.clone());
let hash_of_block0 = api.expect_hash_from_number(0);
// after validation `IncludeData` will have priority set to 9001
// (validate_transaction mock)
let xt = ExtrinsicBuilder::new_include_data(Vec::new()).build();
block_on(pool.submit_and_watch(&BlockId::Number(0), SOURCE, xt)).unwrap();
block_on(pool.submit_and_watch(hash_of_block0, SOURCE, xt)).unwrap();
assert_eq!(pool.validated_pool().status().ready, 1);
// after validation `Transfer` will have priority set to 4 (validate_transaction
@@ -996,15 +1011,14 @@ mod tests {
amount: 5,
nonce: 0,
});
let watcher =
block_on(pool.submit_and_watch(&BlockId::Number(0), SOURCE, xt)).unwrap();
let watcher = block_on(pool.submit_and_watch(hash_of_block0, SOURCE, xt)).unwrap();
assert_eq!(pool.validated_pool().status().ready, 2);
// when
// after validation `Store` will have priority set to 9001 (validate_transaction
// mock)
let xt = ExtrinsicBuilder::new_indexed_call(Vec::new()).build();
block_on(pool.submit_one(&BlockId::Number(1), SOURCE, xt)).unwrap();
block_on(pool.submit_one(api.expect_hash_from_number(1), SOURCE, xt)).unwrap();
assert_eq!(pool.validated_pool().status().ready, 2);
// then
@@ -1021,7 +1035,10 @@ mod tests {
let (tx, rx) = std::sync::mpsc::sync_channel(1);
let mut api = TestApi::default();
api.delay = Arc::new(Mutex::new(rx.into()));
let pool = Arc::new(Pool::new(Default::default(), true.into(), api.into()));
let api = Arc::new(api);
let pool = Arc::new(Pool::new(Default::default(), true.into(), api.clone()));
let hash_of_block0 = api.expect_hash_from_number(0);
// when
let xt = uxt(Transfer {
@@ -1034,7 +1051,7 @@ mod tests {
// This transaction should go to future, since we use `nonce: 1`
let pool2 = pool.clone();
std::thread::spawn(move || {
block_on(pool2.submit_one(&BlockId::Number(0), SOURCE, xt)).unwrap();
block_on(pool2.submit_one(hash_of_block0, SOURCE, xt)).unwrap();
ready.send(()).unwrap();
});
@@ -1048,12 +1065,13 @@ mod tests {
});
// The tag the above transaction provides (TestApi is using just nonce as u8)
let provides = vec![0_u8];
block_on(pool.submit_one(&BlockId::Number(0), SOURCE, xt)).unwrap();
block_on(pool.submit_one(hash_of_block0, SOURCE, xt)).unwrap();
assert_eq!(pool.validated_pool().status().ready, 1);
// Now block import happens before the second transaction is able to finish
// verification.
block_on(pool.prune_tags(&BlockId::Number(1), vec![provides], vec![])).unwrap();
block_on(pool.prune_tags(api.expect_hash_from_number(1), vec![provides], vec![]))
.unwrap();
assert_eq!(pool.validated_pool().status().ready, 0);
// so when we release the verification of the previous one it will have
+20 -24
View File
@@ -166,8 +166,11 @@ where
finalized_hash: Block::Hash,
) -> (Self, Pin<Box<dyn Future<Output = ()> + Send>>) {
let pool = Arc::new(graph::Pool::new(Default::default(), true.into(), pool_api.clone()));
let (revalidation_queue, background_task) =
revalidation::RevalidationQueue::new_background(pool_api.clone(), pool.clone());
let (revalidation_queue, background_task) = revalidation::RevalidationQueue::new_background(
pool_api.clone(),
pool.clone(),
finalized_hash,
);
(
Self {
api: pool_api,
@@ -203,8 +206,11 @@ where
RevalidationType::Light =>
(revalidation::RevalidationQueue::new(pool_api.clone(), pool.clone()), None),
RevalidationType::Full => {
let (queue, background) =
revalidation::RevalidationQueue::new_background(pool_api.clone(), pool.clone());
let (queue, background) = revalidation::RevalidationQueue::new_background(
pool_api.clone(),
pool.clone(),
finalized_hash,
);
(queue, Some(background))
},
};
@@ -254,46 +260,43 @@ where
fn submit_at(
&self,
at: &BlockId<Self::Block>,
at: <Self::Block as BlockT>::Hash,
source: TransactionSource,
xts: Vec<TransactionFor<Self>>,
) -> PoolFuture<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error> {
let pool = self.pool.clone();
let at = *at;
self.metrics
.report(|metrics| metrics.submitted_transactions.inc_by(xts.len() as u64));
async move { pool.submit_at(&at, source, xts).await }.boxed()
async move { pool.submit_at(at, source, xts).await }.boxed()
}
fn submit_one(
&self,
at: &BlockId<Self::Block>,
at: <Self::Block as BlockT>::Hash,
source: TransactionSource,
xt: TransactionFor<Self>,
) -> PoolFuture<TxHash<Self>, Self::Error> {
let pool = self.pool.clone();
let at = *at;
self.metrics.report(|metrics| metrics.submitted_transactions.inc());
async move { pool.submit_one(&at, source, xt).await }.boxed()
async move { pool.submit_one(at, source, xt).await }.boxed()
}
fn submit_and_watch(
&self,
at: &BlockId<Self::Block>,
at: <Self::Block as BlockT>::Hash,
source: TransactionSource,
xt: TransactionFor<Self>,
) -> PoolFuture<Pin<Box<TransactionStatusStreamFor<Self>>>, Self::Error> {
let at = *at;
let pool = self.pool.clone();
self.metrics.report(|metrics| metrics.submitted_transactions.inc());
async move {
let watcher = pool.submit_and_watch(&at, source, xt).await?;
let watcher = pool.submit_and_watch(at, source, xt).await?;
Ok(watcher.into_stream().boxed())
}
@@ -433,11 +436,7 @@ where
let validity = self
.api
.validate_transaction_blocking(
&BlockId::hash(at),
TransactionSource::Local,
xt.clone(),
)?
.validate_transaction_blocking(at, TransactionSource::Local, xt.clone())?
.map_err(|e| {
Self::Error::Pool(match e {
TransactionValidityError::Invalid(i) => TxPoolError::InvalidTransaction(i),
@@ -577,10 +576,7 @@ async fn prune_known_txs_for_block<Block: BlockT, Api: graph::ChainApi<Block = B
},
};
if let Err(e) = pool
.prune(&BlockId::Hash(block_hash), &BlockId::hash(*header.parent_hash()), &extrinsics)
.await
{
if let Err(e) = pool.prune(block_hash, *header.parent_hash(), &extrinsics).await {
log::error!("Cannot prune known in the pool: {}", e);
}
@@ -691,7 +687,7 @@ where
if let Err(e) = pool
.resubmit_at(
&BlockId::Hash(*hash),
*hash,
// These transactions are coming from retracted blocks, we should
// simply consider them external.
TransactionSource::External,
@@ -717,7 +713,7 @@ where
if next_action.revalidate {
let hashes = pool.validated_pool().ready().map(|tx| tx.hash).collect();
self.revalidation_queue.revalidate_later(*block_number, hashes).await;
self.revalidation_queue.revalidate_later(*hash, hashes).await;
self.revalidation_strategy.lock().clear();
}
@@ -25,14 +25,12 @@ use std::{
};
use crate::{
graph::{ChainApi, ExtrinsicHash, NumberFor, Pool, ValidatedTransaction},
graph::{BlockHash, ChainApi, ExtrinsicHash, Pool, ValidatedTransaction},
LOG_TARGET,
};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use sp_runtime::{
generic::BlockId,
traits::{SaturatedConversion, Zero},
transaction_validity::TransactionValidityError,
generic::BlockId, traits::SaturatedConversion, transaction_validity::TransactionValidityError,
};
use futures::prelude::*;
@@ -44,7 +42,7 @@ const MIN_BACKGROUND_REVALIDATION_BATCH_SIZE: usize = 20;
/// Payload from queue to worker.
struct WorkerPayload<Api: ChainApi> {
at: NumberFor<Api>,
at: BlockHash<Api>,
transactions: Vec<ExtrinsicHash<Api>>,
}
@@ -54,9 +52,9 @@ struct WorkerPayload<Api: ChainApi> {
struct RevalidationWorker<Api: ChainApi> {
api: Arc<Api>,
pool: Arc<Pool<Api>>,
best_block: NumberFor<Api>,
block_ordered: BTreeMap<NumberFor<Api>, HashSet<ExtrinsicHash<Api>>>,
members: HashMap<ExtrinsicHash<Api>, NumberFor<Api>>,
best_block: BlockHash<Api>,
block_ordered: BTreeMap<BlockHash<Api>, HashSet<ExtrinsicHash<Api>>>,
members: HashMap<ExtrinsicHash<Api>, BlockHash<Api>>,
}
impl<Api: ChainApi> Unpin for RevalidationWorker<Api> {}
@@ -68,15 +66,30 @@ impl<Api: ChainApi> Unpin for RevalidationWorker<Api> {}
async fn batch_revalidate<Api: ChainApi>(
pool: Arc<Pool<Api>>,
api: Arc<Api>,
at: NumberFor<Api>,
at: BlockHash<Api>,
batch: impl IntoIterator<Item = ExtrinsicHash<Api>>,
) {
// This conversion should work. Otherwise, for unknown block the revalidation shall be skipped,
// all the transactions will be kept in the validated pool, and can be scheduled for
// revalidation with the next request.
let block_number = match api.block_id_to_number(&BlockId::Hash(at)) {
Ok(Some(n)) => n,
Ok(None) => {
log::debug!(target: LOG_TARGET, "revalidation skipped at block {at:?}, could not get block number.");
return
},
Err(e) => {
log::debug!(target: LOG_TARGET, "revalidation skipped at block {at:?}: {e:?}.");
return
},
};
let mut invalid_hashes = Vec::new();
let mut revalidated = HashMap::new();
let validation_results = futures::future::join_all(batch.into_iter().filter_map(|ext_hash| {
pool.validated_pool().ready_by_hash(&ext_hash).map(|ext| {
api.validate_transaction(&BlockId::Number(at), ext.source, ext.data.clone())
api.validate_transaction(at, ext.source, ext.data.clone())
.map(move |validation_result| (validation_result, ext_hash, ext))
})
}))
@@ -107,7 +120,7 @@ async fn batch_revalidate<Api: ChainApi>(
revalidated.insert(
ext_hash,
ValidatedTransaction::valid_at(
at.saturated_into::<u64>(),
block_number.saturated_into::<u64>(),
ext_hash,
ext.source,
ext.data.clone(),
@@ -135,13 +148,13 @@ async fn batch_revalidate<Api: ChainApi>(
}
impl<Api: ChainApi> RevalidationWorker<Api> {
fn new(api: Arc<Api>, pool: Arc<Pool<Api>>) -> Self {
fn new(api: Arc<Api>, pool: Arc<Pool<Api>>, best_block: BlockHash<Api>) -> Self {
Self {
api,
pool,
best_block,
block_ordered: Default::default(),
members: Default::default(),
best_block: Zero::zero(),
}
}
@@ -303,10 +316,11 @@ where
api: Arc<Api>,
pool: Arc<Pool<Api>>,
interval: Duration,
best_block: BlockHash<Api>,
) -> (Self, Pin<Box<dyn Future<Output = ()> + Send>>) {
let (to_worker, from_queue) = tracing_unbounded("mpsc_revalidation_queue", 100_000);
let worker = RevalidationWorker::new(api.clone(), pool.clone());
let worker = RevalidationWorker::new(api.clone(), pool.clone(), best_block);
let queue = Self { api, pool, background: Some(to_worker) };
@@ -317,8 +331,9 @@ where
pub fn new_background(
api: Arc<Api>,
pool: Arc<Pool<Api>>,
best_block: BlockHash<Api>,
) -> (Self, Pin<Box<dyn Future<Output = ()> + Send>>) {
Self::new_with_interval(api, pool, BACKGROUND_REVALIDATION_INTERVAL)
Self::new_with_interval(api, pool, BACKGROUND_REVALIDATION_INTERVAL, best_block)
}
/// Queue some transaction for later revalidation.
@@ -328,7 +343,7 @@ where
/// revalidation is actually done.
pub async fn revalidate_later(
&self,
at: NumberFor<Api>,
at: BlockHash<Api>,
transactions: Vec<ExtrinsicHash<Api>>,
) {
if transactions.len() > 0 {
@@ -360,9 +375,8 @@ mod tests {
};
use futures::executor::block_on;
use sc_transaction_pool_api::TransactionSource;
use sp_runtime::generic::BlockId;
use substrate_test_runtime::{AccountId, Transfer, H256};
use substrate_test_runtime_client::AccountKeyring::Alice;
use substrate_test_runtime_client::AccountKeyring::{Alice, Bob};
#[test]
fn revalidation_queue_works() {
@@ -376,18 +390,63 @@ mod tests {
amount: 5,
nonce: 0,
});
let uxt_hash = block_on(pool.submit_one(
&BlockId::number(0),
TransactionSource::External,
uxt.clone(),
))
.expect("Should be valid");
block_on(queue.revalidate_later(0, vec![uxt_hash]));
let hash_of_block0 = api.expect_hash_from_number(0);
let uxt_hash =
block_on(pool.submit_one(hash_of_block0, TransactionSource::External, uxt.clone()))
.expect("Should be valid");
block_on(queue.revalidate_later(hash_of_block0, vec![uxt_hash]));
// revalidated in sync offload 2nd time
assert_eq!(api.validation_requests().len(), 2);
// number of ready
assert_eq!(pool.validated_pool().status().ready, 1);
}
#[test]
fn revalidation_queue_skips_revalidation_for_unknown_block_hash() {
let api = Arc::new(TestApi::default());
let pool = Arc::new(Pool::new(Default::default(), true.into(), api.clone()));
let queue = Arc::new(RevalidationQueue::new(api.clone(), pool.clone()));
let uxt0 = uxt(Transfer {
from: Alice.into(),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 0,
});
let uxt1 = uxt(Transfer {
from: Bob.into(),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 4,
nonce: 1,
});
let hash_of_block0 = api.expect_hash_from_number(0);
let unknown_block = H256::repeat_byte(0x13);
let uxt_hashes =
block_on(pool.submit_at(hash_of_block0, TransactionSource::External, vec![uxt0, uxt1]))
.expect("Should be valid")
.into_iter()
.map(|r| r.expect("Should be valid"))
.collect::<Vec<_>>();
assert_eq!(api.validation_requests().len(), 2);
assert_eq!(pool.validated_pool().status().ready, 2);
// revalidation works fine for block 0:
block_on(queue.revalidate_later(hash_of_block0, uxt_hashes.clone()));
assert_eq!(api.validation_requests().len(), 4);
assert_eq!(pool.validated_pool().status().ready, 2);
// revalidation shall be skipped for unknown block:
block_on(queue.revalidate_later(unknown_block, uxt_hashes));
// no revalidation shall be done
assert_eq!(api.validation_requests().len(), 4);
// number of ready shall not change
assert_eq!(pool.validated_pool().status().ready, 2);
}
}
+14 -6
View File
@@ -32,7 +32,7 @@ use sp_runtime::{
};
use std::{collections::HashSet, sync::Arc};
use substrate_test_runtime::{
substrate_test_pallet::pallet::Call as PalletCall, BalancesCall, Block, Extrinsic,
substrate_test_pallet::pallet::Call as PalletCall, BalancesCall, Block, BlockNumber, Extrinsic,
ExtrinsicBuilder, Hashing, RuntimeCall, Transfer, TransferData, H256,
};
@@ -53,6 +53,11 @@ impl TestApi {
pub fn validation_requests(&self) -> Vec<Extrinsic> {
self.validation_requests.lock().clone()
}
/// Helper function for mapping block number to hash. Use if mapping shall not fail.
pub fn expect_hash_from_number(&self, n: BlockNumber) -> H256 {
self.block_id_to_hash(&BlockId::Number(n)).unwrap().unwrap()
}
}
impl ChainApi for TestApi {
@@ -64,13 +69,13 @@ impl ChainApi for TestApi {
/// Verify extrinsic at given block.
fn validate_transaction(
&self,
at: &BlockId<Self::Block>,
at: <Self::Block as BlockT>::Hash,
_source: TransactionSource,
uxt: ExtrinsicFor<Self>,
) -> Self::ValidationFuture {
self.validation_requests.lock().push(uxt.clone());
let hash = self.hash_and_length(&uxt).0;
let block_number = self.block_id_to_number(at).unwrap().unwrap();
let block_number = self.block_id_to_number(&BlockId::Hash(at)).unwrap().unwrap();
let res = match uxt {
Extrinsic {
@@ -153,6 +158,8 @@ impl ChainApi for TestApi {
) -> Result<Option<NumberFor<Self>>, Self::Error> {
Ok(match at {
BlockId::Number(num) => Some(*num),
BlockId::Hash(hash) if *hash == H256::from_low_u64_be(hash.to_low_u64_be()) =>
Some(hash.to_low_u64_be()),
BlockId::Hash(_) => None,
})
}
@@ -164,7 +171,7 @@ impl ChainApi for TestApi {
) -> Result<Option<<Self::Block as BlockT>::Hash>, Self::Error> {
Ok(match at {
BlockId::Number(num) => Some(H256::from_low_u64_be(*num)).into(),
BlockId::Hash(_) => None,
BlockId::Hash(hash) => Some(*hash),
})
}
@@ -199,6 +206,7 @@ pub(crate) fn uxt(transfer: Transfer) -> Extrinsic {
ExtrinsicBuilder::new_transfer(transfer).build()
}
pub(crate) fn pool() -> Pool<TestApi> {
Pool::new(Default::default(), true.into(), TestApi::default().into())
pub(crate) fn pool() -> (Pool<TestApi>, Arc<TestApi>) {
let api = Arc::new(TestApi::default());
(Pool::new(Default::default(), true.into(), api.clone()), api)
}