Async/await in transaction-graph (#4645)

* async/await in tx graph

* review notes

* remove unused typedef
This commit is contained in:
Nikolay Volf
2020-01-24 05:22:39 -08:00
committed by GitHub
parent b7887df104
commit 1614ce0996
3 changed files with 144 additions and 156 deletions
@@ -27,7 +27,6 @@ use serde::Serialize;
use futures::{
Future, FutureExt,
channel::mpsc,
future::{Either, ready, join_all},
};
use sp_runtime::{
generic::BlockId,
@@ -132,8 +131,8 @@ impl<B: ChainApi> Pool<B> {
}
/// Imports a bunch of unverified extrinsics to the pool
pub fn submit_at<T>(&self, at: &BlockId<B::Block>, xts: T, force: bool)
-> impl Future<Output=Result<Vec<Result<ExHash<B>, B::Error>>, B::Error>>
pub async fn submit_at<T>(&self, at: &BlockId<B::Block>, xts: T, force: bool)
-> Result<Vec<Result<ExHash<B>, B::Error>>, B::Error>
where
T: IntoIterator<Item=ExtrinsicFor<B>>
{
@@ -143,48 +142,43 @@ impl<B: ChainApi> Pool<B> {
.map(|validated_transactions| validated_pool.submit(validated_transactions
.into_iter()
.map(|(_, tx)| tx))))
.await
}
/// Imports one unverified extrinsic to the pool
pub fn submit_one(
pub async fn submit_one(
&self,
at: &BlockId<B::Block>,
xt: ExtrinsicFor<B>,
) -> impl Future<Output=Result<ExHash<B>, B::Error>> {
) -> Result<ExHash<B>, B::Error> {
self.submit_at(at, std::iter::once(xt), false)
.map(|import_result| import_result.and_then(|mut import_result| import_result
.pop()
.expect("One extrinsic passed; one result returned; qed")
))
.await
}
/// Import a single extrinsic and starts to watch their progress in the pool.
pub fn submit_and_watch(
pub async fn submit_and_watch(
&self,
at: &BlockId<B::Block>,
xt: ExtrinsicFor<B>,
) -> impl Future<Output=Result<Watcher<ExHash<B>, BlockHash<B>>, B::Error>> {
let block_number = match self.resolve_block_number(at) {
Ok(block_number) => block_number,
Err(err) => return Either::Left(ready(Err(err)))
};
let validated_pool = self.validated_pool.clone();
Either::Right(
self.verify_one(at, block_number, xt, false)
.map(move |validated_transactions| validated_pool.submit_and_watch(validated_transactions.1))
)
) -> Result<Watcher<ExHash<B>, BlockHash<B>>, B::Error> {
let block_number = self.resolve_block_number(at)?;
let (_, tx) = self.verify_one(at, block_number, xt, false).await;
self.validated_pool.submit_and_watch(tx)
}
/// Revalidate all ready transactions.
///
/// Returns future that performs validation of all ready transactions and
/// then resubmits all transactions back to the pool.
pub fn revalidate_ready(
pub async fn revalidate_ready(
&self,
at: &BlockId<B::Block>,
max: Option<usize>,
) -> impl Future<Output=Result<(), B::Error>> {
) -> Result<(), B::Error> {
use std::time::Instant;
log::debug!(target: "txpool",
"Fetching ready transactions (up to: {})",
@@ -196,23 +190,20 @@ impl<B: ChainApi> Pool<B> {
.take(max.unwrap_or_else(usize::max_value));
let now = Instant::now();
self.verify(at, ready, false)
.map(move |revalidated_transactions| {
log::debug!(target: "txpool",
"Re-verified transactions, took {} ms. Resubmitting.",
now.elapsed().as_millis()
);
let now = Instant::now();
let res = revalidated_transactions.map(
|revalidated_transactions| validated_pool.resubmit(revalidated_transactions)
);
log::debug!(target: "txpool",
"Resubmitted. Took {} ms. Status: {:?}",
now.elapsed().as_millis(),
validated_pool.status()
);
res
})
let revalidated_transactions = self.verify(at, ready, false).await?;
log::debug!(target: "txpool",
"Re-verified transactions, took {} ms. Resubmitting.",
now.elapsed().as_millis()
);
let now = Instant::now();
self.validated_pool.resubmit(revalidated_transactions);
log::debug!(target: "txpool",
"Resubmitted. Took {} ms. Status: {:?}",
now.elapsed().as_millis(),
validated_pool.status()
);
Ok(())
}
/// Prunes known ready transactions.
@@ -238,12 +229,12 @@ impl<B: ChainApi> Pool<B> {
/// To perform pruning we need the tags that each extrinsic provides and to avoid calling
/// into runtime too often we first lookup all extrinsics that are in the pool and get
/// their provided tags from there. Otherwise we query the runtime at the `parent` block.
pub fn prune(
pub async fn prune(
&self,
at: &BlockId<B::Block>,
parent: &BlockId<B::Block>,
extrinsics: &[ExtrinsicFor<B>],
) -> impl Future<Output=Result<(), B::Error>> {
) -> Result<(), B::Error> {
log::debug!(
target: "txpool",
"Starting pruning of block {:?} (extrinsics: {})",
@@ -257,34 +248,26 @@ impl<B: ChainApi> Pool<B> {
// Zip the ones from the pool with the full list (we get pairs `(Extrinsic, Option<Vec<Tag>>)`)
let all = extrinsics.iter().zip(in_pool_tags.into_iter());
// Prepare future that collect tags for all extrinsics
let future_tags = join_all(all
.map(|(extrinsic, in_pool_tags)|
match in_pool_tags {
// reuse the tags for extrinsics that were found in the pool
Some(tags) => Either::Left(
ready(tags)
),
// if it's not found in the pool query the runtime at parent block
// to get validity info and tags that the extrinsic provides.
None => Either::Right(self.validated_pool.api().validate_transaction(parent, extrinsic.clone())
.then(|validity| ready(match validity {
Ok(Ok(validity)) => validity.provides,
// silently ignore invalid extrinsics,
// cause they might just be inherent
_ => Vec::new(),
}))),
}
));
let mut future_tags = Vec::new();
for (extrinsic, in_pool_tags) in all {
match in_pool_tags {
// reuse the tags for extrinsics that were found in the pool
Some(tags) => future_tags.extend(tags),
// if it's not found in the pool query the runtime at parent block
// to get validity info and tags that the extrinsic provides.
None => {
let validity = self.validated_pool.api()
.validate_transaction(parent, extrinsic.clone())
.await;
// Prune transactions by tags
let at = at.clone();
let self_clone = self.clone();
future_tags.then(move |tags| self_clone.prune_tags(
&at,
tags.into_iter().flat_map(|tags| tags),
in_pool_hashes,
))
if let Ok(Ok(validity)) = validity {
future_tags.extend(validity.provides);
}
},
}
}
self.prune_tags(at, future_tags, in_pool_hashes).await
}
/// Prunes ready transactions that provide given list of tags.
@@ -308,17 +291,17 @@ impl<B: ChainApi> Pool<B> {
/// the second parameter of `known_imported_hashes`. These transactions
/// (if pruned) are not revalidated and become temporarily banned to
/// prevent importing them in the (near) future.
pub fn prune_tags(
pub async fn prune_tags(
&self,
at: &BlockId<B::Block>,
tags: impl IntoIterator<Item=Tag>,
known_imported_hashes: impl IntoIterator<Item=ExHash<B>> + Clone,
) -> impl Future<Output=Result<(), B::Error>> {
) -> Result<(), B::Error> {
log::debug!(target: "txpool", "Pruning at {:?}", at);
// Prune all transactions that provide given tags
let prune_status = match self.validated_pool.prune_tags(tags) {
Ok(prune_status) => prune_status,
Err(e) => return Either::Left(ready(Err(e))),
Err(e) => return Err(e),
};
// Make sure that we don't revalidate extrinsics that were part of the recently
@@ -330,21 +313,18 @@ impl<B: ChainApi> Pool<B> {
// note that `known_imported_hashes` will be rejected here due to temporary ban.
let pruned_hashes = prune_status.pruned.iter().map(|tx| tx.hash.clone()).collect::<Vec<_>>();
let pruned_transactions = prune_status.pruned.into_iter().map(|tx| tx.data.clone());
let reverify_future = self.verify(at, pruned_transactions, false);
let reverified_transactions = self.verify(at, pruned_transactions, false).await?;
log::trace!(target: "txpool", "Prunning at {:?}. Resubmitting transactions.", at);
// And finally - submit reverified transactions back to the pool
let at = at.clone();
let validated_pool = self.validated_pool.clone();
Either::Right(reverify_future.then(move |reverified_transactions|
ready(reverified_transactions.and_then(|reverified_transactions|
validated_pool.resubmit_pruned(
&at,
known_imported_hashes,
pruned_hashes,
reverified_transactions.into_iter().map(|(_, xt)| xt).collect(),
))
)))
self.validated_pool.resubmit_pruned(
&at,
known_imported_hashes,
pruned_hashes,
reverified_transactions.into_iter().map(|(_, xt)| xt).collect(),
)
}
/// Return an event stream of notifications for when transactions are imported to the pool.
@@ -388,69 +368,74 @@ impl<B: ChainApi> Pool<B> {
}
/// Returns future that validates a bunch of transactions at given block.
fn verify(
async fn verify(
&self,
at: &BlockId<B::Block>,
xts: impl IntoIterator<Item=ExtrinsicFor<B>>,
force: bool,
) -> impl Future<Output=Result<HashMap<ExHash<B>, ValidatedTransactionFor<B>>, B::Error>> {
) -> Result<HashMap<ExHash<B>, ValidatedTransactionFor<B>>, B::Error> {
// we need a block number to compute tx validity
let block_number = match self.resolve_block_number(at) {
Ok(block_number) => block_number,
Err(err) => return Either::Left(ready(Err(err))),
};
let block_number = self.resolve_block_number(at)?;
let mut result = HashMap::new();
// for each xt, prepare a validation future
let validation_futures = xts.into_iter().map(move |xt|
self.verify_one(at, block_number, xt, force)
);
for xt in xts {
let (hash, validated_tx) = self.verify_one(at, block_number, xt, force).await;
result.insert(hash, validated_tx);
}
// make single validation future that waits all until all extrinsics are validated
Either::Right(join_all(validation_futures).then(|x| ready(Ok(x.into_iter().collect()))))
Ok(result)
}
/// Returns future that validates single transaction at given block.
fn verify_one(
async fn verify_one(
&self,
block_id: &BlockId<B::Block>,
block_number: NumberFor<B>,
xt: ExtrinsicFor<B>,
force: bool,
) -> impl Future<Output=(ExHash<B>, ValidatedTransactionFor<B>)> {
) -> (ExHash<B>, ValidatedTransactionFor<B>) {
let (hash, bytes) = self.validated_pool.api().hash_and_length(&xt);
if !force && self.validated_pool.is_banned(&hash) {
return Either::Left(ready((
return (
hash.clone(),
ValidatedTransaction::Invalid(hash, error::Error::TemporarilyBanned.into()),
)))
)
}
Either::Right(self.validated_pool.api().validate_transaction(block_id, xt.clone())
.then(move |validation_result| ready((hash.clone(), match validation_result {
Ok(validity) => match validity {
Ok(validity) => if validity.provides.is_empty() {
ValidatedTransaction::Invalid(hash, error::Error::NoTagsProvided.into())
} else {
ValidatedTransaction::Valid(base::Transaction {
data: xt,
bytes,
hash,
priority: validity.priority,
requires: validity.requires,
provides: validity.provides,
propagate: validity.propagate,
valid_till: block_number
.saturated_into::<u64>()
.saturating_add(validity.longevity),
})
},
Err(TransactionValidityError::Invalid(e)) =>
ValidatedTransaction::Invalid(hash, error::Error::InvalidTransaction(e).into()),
Err(TransactionValidityError::Unknown(e)) =>
ValidatedTransaction::Unknown(hash, error::Error::UnknownTransaction(e).into()),
},
Err(e) => ValidatedTransaction::Invalid(hash, e),
}))))
let validation_result = self.validated_pool.api().validate_transaction(block_id, xt.clone()).await;
let status = match validation_result {
Ok(status) => status,
Err(e) => return (hash.clone(), ValidatedTransaction::Invalid(hash, e)),
};
let validity = match status {
Ok(validity) => {
if validity.provides.is_empty() {
ValidatedTransaction::Invalid(hash.clone(), error::Error::NoTagsProvided.into())
} else {
ValidatedTransaction::Valid(base::Transaction {
data: xt,
bytes,
hash: hash.clone(),
priority: validity.priority,
requires: validity.requires,
provides: validity.provides,
propagate: validity.propagate,
valid_till: block_number
.saturated_into::<u64>()
.saturating_add(validity.longevity),
})
}
},
Err(TransactionValidityError::Invalid(e)) =>
ValidatedTransaction::Invalid(hash.clone(), error::Error::InvalidTransaction(e).into()),
Err(TransactionValidityError::Unknown(e)) =>
ValidatedTransaction::Unknown(hash.clone(), error::Error::UnknownTransaction(e).into()),
};
(hash, validity)
}
}
+24 -10
View File
@@ -39,9 +39,11 @@ use sp_runtime::{
use sp_transaction_pool::{
TransactionPool, PoolStatus, ImportNotificationStream,
TxHash, TransactionFor, TransactionStatusStreamFor, BlockHash,
MaintainedTransactionPool,
MaintainedTransactionPool, PoolFuture,
};
type PoolResult<T> = PoolFuture<T, error::Error>;
/// Basic implementation of transaction pool that can be customized by providing PoolApi.
pub struct BasicPool<PoolApi, Block>
where
@@ -129,28 +131,40 @@ impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
fn submit_at(
&self,
at: &BlockId<Self::Block>,
xts: impl IntoIterator<Item=TransactionFor<Self>> + 'static,
) -> Box<dyn Future<Output=Result<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error>> + Send + Unpin> {
Box::new(self.pool.submit_at(at, xts, false))
xts: Vec<TransactionFor<Self>>,
) -> PoolResult<Vec<Result<TxHash<Self>, Self::Error>>> {
let pool = self.pool.clone();
let at = *at;
async move {
pool.submit_at(&at, xts, false).await
}.boxed()
}
fn submit_one(
&self,
at: &BlockId<Self::Block>,
xt: TransactionFor<Self>,
) -> Box<dyn Future<Output=Result<TxHash<Self>, Self::Error>> + Send + Unpin> {
Box::new(self.pool.submit_one(at, xt))
) -> PoolResult<TxHash<Self>> {
let pool = self.pool.clone();
let at = *at;
async move {
pool.submit_one(&at, xt).await
}.boxed()
}
fn submit_and_watch(
&self,
at: &BlockId<Self::Block>,
xt: TransactionFor<Self>,
) -> Box<dyn Future<Output=Result<Box<TransactionStatusStreamFor<Self>>, Self::Error>> + Send + Unpin> {
Box::new(
self.pool.submit_and_watch(at, xt)
) -> PoolResult<Box<TransactionStatusStreamFor<Self>>> {
let at = *at;
let pool = self.pool.clone();
async move {
pool.submit_and_watch(&at, xt)
.map(|result| result.map(|watcher| Box::new(watcher.into_stream()) as _))
)
.await
}.boxed()
}
fn remove_invalid(&self, hashes: &[TxHash<Self>]) -> Vec<Arc<Self::InPoolTransaction>> {
@@ -124,6 +124,9 @@ pub type TransactionFor<P> = <<P as TransactionPool>::Block as BlockT>::Extrinsi
/// Type of transactions event stream for a pool.
pub type TransactionStatusStreamFor<P> = TransactionStatusStream<TxHash<P>, BlockHash<P>>;
/// Typical future type used in transaction pool api.
pub type PoolFuture<T, E> = std::pin::Pin<Box<dyn Future<Output=Result<T, E>> + Send>>;
/// In-pool transaction interface.
///
/// The pool is container of transactions that are implementing this trait.
@@ -170,55 +173,41 @@ pub trait TransactionPool: Send + Sync {
fn submit_at(
&self,
at: &BlockId<Self::Block>,
xts: impl IntoIterator<Item=TransactionFor<Self>> + 'static,
) -> Box<dyn Future<Output=Result<
Vec<Result<TxHash<Self>, Self::Error>>,
Self::Error
>> + Send + Unpin>;
xts: Vec<TransactionFor<Self>>,
) -> PoolFuture<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error>;
/// Returns a future that imports one unverified transaction to the pool.
fn submit_one(
&self,
at: &BlockId<Self::Block>,
xt: TransactionFor<Self>,
) -> Box<dyn Future<Output=Result<
TxHash<Self>,
Self::Error
>> + Send + Unpin>;
// RPC
) -> PoolFuture<TxHash<Self>, Self::Error>;
// *** RPC
/// Returns a future that import a single transaction and starts to watch their progress in the pool.
fn submit_and_watch(
&self,
at: &BlockId<Self::Block>,
xt: TransactionFor<Self>,
) -> Box<dyn Future<Output=Result<Box<TransactionStatusStreamFor<Self>>, Self::Error>> + Send + Unpin>;
// Block production / Networking
) -> PoolFuture<Box<TransactionStatusStreamFor<Self>>, Self::Error>;
// *** Block production / Networking
/// Get an iterator for ready transactions ordered by priority
fn ready(&self) -> Box<dyn Iterator<Item=Arc<Self::InPoolTransaction>>>;
// Block production
// *** Block production
/// Remove transactions identified by given hashes (and dependent transactions) from the pool.
fn remove_invalid(&self, hashes: &[TxHash<Self>]) -> Vec<Arc<Self::InPoolTransaction>>;
// logging
// *** logging
/// Returns pool status.
fn status(&self) -> PoolStatus;
// logging / RPC / networking
// *** logging / RPC / networking
/// Return an event stream of transactions imported to the pool.
fn import_notification_stream(&self) -> ImportNotificationStream;
// networking
// *** networking
/// Notify the pool about transactions broadcast.
fn on_broadcasted(&self, propagations: HashMap<TxHash<Self>, Vec<String>>);