Prepare for asynchronous transaction validation in tx pool (#3650)

* async txpool API

* Update core/rpc/src/author/mod.rs

Co-Authored-By: Tomasz Drwięga <tomusdrw@users.noreply.github.com>

* Update core/transaction-pool/graph/src/pool.rs

Co-Authored-By: Tomasz Drwięga <tomusdrw@users.noreply.github.com>

* Pool -> Pool + ValidatedPool

* removed lost block_on when importing xt from network

* fix grumbles

* alias for future::Executor in rpc

* removed executor from Author RPCs

* Pool + SharedValidatedPool -> Pool

* fix compilation after merge

* another fix

* another fix
This commit is contained in:
Svyatoslav Nikolsky
2019-10-01 12:14:25 +03:00
committed by GitHub
parent facf31f77e
commit 387c31598d
29 changed files with 912 additions and 497 deletions
+48 -33
View File
@@ -26,7 +26,12 @@ use chain_spec::{RuntimeGenesis, Extension};
use codec::{Decode, Encode, IoReader};
use consensus_common::import_queue::ImportQueue;
use futures::{prelude::*, sync::mpsc};
use futures03::{FutureExt as _, compat::Compat, StreamExt as _, TryStreamExt as _};
use futures03::{
compat::Compat,
future::ready,
FutureExt as _, TryFutureExt as _,
StreamExt as _, TryStreamExt as _,
};
use keystore::{Store as Keystore, KeyStorePtr};
use log::{info, warn};
use network::{FinalityProofProvider, OnDemand, NetworkService, NetworkStateInfo, DhtEvent};
@@ -913,7 +918,7 @@ where
RpcB: RpcBuilder<Block, Backend, Executor, Api>,
{
use rpc::{chain, state, author, system};
let subscriptions = rpc::Subscriptions::new(task_executor.clone());
let subscriptions = rpc::Subscriptions::new(task_executor);
let chain = rpc_builder.build_chain(subscriptions.clone());
let state = rpc_builder.build_state(subscriptions.clone());
let author = rpc::author::Author::new(
@@ -935,45 +940,54 @@ where
pub(crate) fn maintain_transaction_pool<Api, Backend, Block, Executor, PoolApi>(
id: &BlockId<Block>,
client: &Client<Backend, Executor, Block, Api>,
client: &Arc<Client<Backend, Executor, Block, Api>>,
transaction_pool: &TransactionPool<PoolApi>,
retracted: &[Block::Hash],
) -> error::Result<()> where
) -> error::Result<Box<dyn Future<Item = (), Error = ()> + Send>> where
Block: BlockT<Hash = <Blake2Hasher as primitives::Hasher>::Out>,
Backend: client::backend::Backend<Block, Blake2Hasher>,
Backend: 'static + client::backend::Backend<Block, Blake2Hasher>,
Client<Backend, Executor, Block, Api>: ProvideRuntimeApi,
<Client<Backend, Executor, Block, Api> as ProvideRuntimeApi>::Api: runtime_api::TaggedTransactionQueue<Block>,
Executor: client::CallExecutor<Block, Blake2Hasher>,
PoolApi: txpool::ChainApi<Hash = Block::Hash, Block = Block>,
Executor: 'static + client::CallExecutor<Block, Blake2Hasher>,
PoolApi: 'static + txpool::ChainApi<Hash = Block::Hash, Block = Block>,
Api: 'static,
{
// Put transactions from retracted blocks back into the pool.
for r in retracted {
if let Some(block) = client.block(&BlockId::hash(*r))? {
let extrinsics = block.block.extrinsics();
if let Err(e) = transaction_pool.submit_at(
id,
extrinsics.iter().filter(|e| {
e.is_signed().unwrap_or(false)
}).cloned(),
true
) {
let client_copy = client.clone();
let retracted_transactions = retracted.to_vec().into_iter()
.filter_map(move |hash| client_copy.block(&BlockId::hash(hash)).ok().unwrap_or(None))
.flat_map(|block| block.block.deconstruct().1.into_iter())
.filter(|tx| tx.is_signed().unwrap_or(false));
let resubmit_future = transaction_pool
.submit_at(id, retracted_transactions, true)
.then(|resubmit_result| ready(match resubmit_result {
Ok(_) => Ok(()),
Err(e) => {
warn!("Error re-submitting transactions: {:?}", e);
Ok(())
}
}
}
}))
.compat();
// Avoid calling into runtime if there is nothing to prune from the pool anyway.
if transaction_pool.status().is_empty() {
return Ok(())
return Ok(Box::new(resubmit_future))
}
if let Some(block) = client.block(id)? {
let parent_id = BlockId::hash(*block.block.header().parent_hash());
let extrinsics = block.block.extrinsics();
transaction_pool.prune(id, &parent_id, extrinsics).map_err(|e| format!("{:?}", e))?;
}
let block = client.block(id)?;
Ok(match block {
Some(block) => {
let parent_id = BlockId::hash(*block.block.header().parent_hash());
let prune_future = transaction_pool
.prune(id, &parent_id, block.block.extrinsics())
.boxed()
.compat()
.map_err(|e| { format!("{:?}", e); });
Ok(())
Box::new(resubmit_future.and_then(|_| prune_future))
},
None => Box::new(resubmit_future),
})
}
pub(crate) fn offchain_workers<Api, Backend, Block, Executor, PoolApi>(
@@ -1005,6 +1019,7 @@ where
#[cfg(test)]
mod tests {
use super::*;
use futures03::executor::block_on;
use consensus_common::{BlockOrigin, SelectChain};
use substrate_test_runtime_client::{prelude::*, runtime::Transfer};
@@ -1012,7 +1027,7 @@ mod tests {
fn should_remove_transactions_from_the_pool() {
let (client, longest_chain) = TestClientBuilder::new().build_with_longest_chain();
let client = Arc::new(client);
let pool = TransactionPool::new(Default::default(), ::transaction_pool::ChainApi::new(client.clone()));
let pool = TransactionPool::new(Default::default(), ::transaction_pool::FullChainApi::new(client.clone()));
let transaction = Transfer {
amount: 5,
nonce: 0,
@@ -1022,7 +1037,7 @@ mod tests {
let best = longest_chain.best_chain().unwrap();
// store the transaction in the pool
pool.submit_one(&BlockId::hash(best.hash()), transaction.clone()).unwrap();
block_on(pool.submit_one(&BlockId::hash(best.hash()), transaction.clone())).unwrap();
// import the block
let mut builder = client.new_block(Default::default()).unwrap();
@@ -1038,7 +1053,7 @@ mod tests {
&client,
&pool,
&[]
).unwrap();
).unwrap().wait().unwrap();
// then
assert_eq!(pool.status().ready, 0);
@@ -1049,7 +1064,7 @@ mod tests {
fn should_add_reverted_transactions_to_the_pool() {
let (client, longest_chain) = TestClientBuilder::new().build_with_longest_chain();
let client = Arc::new(client);
let pool = TransactionPool::new(Default::default(), ::transaction_pool::ChainApi::new(client.clone()));
let pool = TransactionPool::new(Default::default(), ::transaction_pool::FullChainApi::new(client.clone()));
let transaction = Transfer {
amount: 5,
nonce: 0,
@@ -1059,7 +1074,7 @@ mod tests {
let best = longest_chain.best_chain().unwrap();
// store the transaction in the pool
pool.submit_one(&BlockId::hash(best.hash()), transaction.clone()).unwrap();
block_on(pool.submit_one(&BlockId::hash(best.hash()), transaction.clone())).unwrap();
// import the block
let mut builder = client.new_block(Default::default()).unwrap();
@@ -1076,7 +1091,7 @@ mod tests {
&client,
&pool,
&[]
).unwrap();
).unwrap().wait().unwrap();
// then
assert_eq!(pool.status().ready, 0);
@@ -1094,7 +1109,7 @@ mod tests {
&client,
&pool,
&[block1_hash]
).unwrap();
).unwrap().wait().unwrap();
// then
assert_eq!(pool.status().ready, 1);
+42 -30
View File
@@ -36,8 +36,14 @@ use parking_lot::Mutex;
use client::{runtime_api::BlockT, Client};
use exit_future::Signal;
use futures::prelude::*;
use futures03::stream::{StreamExt as _, TryStreamExt as _};
use network::{NetworkService, NetworkState, specialization::NetworkSpecialization, Event, DhtEvent};
use futures03::{
future::{ready, FutureExt as _, TryFutureExt as _},
stream::{StreamExt as _, TryStreamExt as _},
};
use network::{
NetworkService, NetworkState, specialization::NetworkSpecialization,
Event, DhtEvent, PeerId, ReportHandle,
};
use log::{log, warn, debug, error, Level};
use codec::{Encode, Decode};
use primitives::{Blake2Hasher, H256};
@@ -168,6 +174,7 @@ macro_rules! new_impl {
imports_external_transactions: !$config.roles.is_light(),
pool: transaction_pool.clone(),
client: client.clone(),
executor: Arc::new(SpawnTaskHandle { sender: to_spawn_tx.clone(), on_exit: exit.clone() }),
});
let protocol_id = {
@@ -233,12 +240,13 @@ macro_rules! new_impl {
let txpool = txpool.upgrade();
if let (Some(txpool), Some(client)) = (txpool.as_ref(), wclient.upgrade()) {
$maintain_transaction_pool(
let future = $maintain_transaction_pool(
&BlockId::hash(notification.hash),
&*client,
&client,
&*txpool,
&notification.retracted,
).map_err(|e| warn!("Pool error processing new block: {:?}", e))?;
let _ = to_spawn_tx_.unbounded_send(future);
}
let offchain = offchain.as_ref().and_then(|o| o.upgrade());
@@ -871,6 +879,7 @@ pub struct TransactionPoolAdapter<C, P> {
imports_external_transactions: bool,
pool: Arc<P>,
client: Arc<C>,
executor: TaskExecutor,
}
/// Get transactions for propagation.
@@ -898,7 +907,7 @@ impl<B, H, C, PoolApi, E> network::TransactionPool<H, B> for
TransactionPoolAdapter<C, TransactionPool<PoolApi>>
where
C: network::ClientHandle<B> + Send + Sync,
PoolApi: ChainApi<Block=B, Hash=H, Error=E>,
PoolApi: 'static + ChainApi<Block=B, Hash=H, Error=E>,
B: BlockT,
H: std::hash::Hash + Eq + sr_primitives::traits::Member + sr_primitives::traits::MaybeSerialize,
E: txpool::error::IntoPoolError + From<txpool::error::Error>,
@@ -907,38 +916,40 @@ where
transactions_to_propagate(&self.pool)
}
fn import(&self, transaction: &<B as BlockT>::Extrinsic) -> Option<H> {
fn hash_of(&self, transaction: &B::Extrinsic) -> H {
self.pool.hash_of(transaction)
}
fn import(&self, report_handle: ReportHandle, who: PeerId, reputation_change: i32, transaction: B::Extrinsic) {
if !self.imports_external_transactions {
debug!("Transaction rejected");
return None;
return;
}
let encoded = transaction.encode();
match Decode::decode(&mut &encoded[..]) {
Ok(uxt) => {
let best_block_id = BlockId::hash(self.client.info().chain.best_hash);
match self.pool.submit_one(&best_block_id, uxt) {
Ok(hash) => Some(hash),
Err(e) => match e.into_pool_error() {
Ok(txpool::error::Error::AlreadyImported(hash)) => {
hash.downcast::<H>().ok()
.map(|x| x.as_ref().clone())
},
Ok(e) => {
debug!("Error adding transaction to the pool: {:?}", e);
None
},
Err(e) => {
debug!("Error converting pool error: {:?}", e);
None
},
}
let import_future = self.pool.submit_one(&best_block_id, uxt);
let import_future = import_future
.then(move |import_result| {
match import_result {
Ok(_) => report_handle.report_peer(who, reputation_change),
Err(e) => match e.into_pool_error() {
Ok(txpool::error::Error::AlreadyImported(_)) => (),
Ok(e) => debug!("Error adding transaction to the pool: {:?}", e),
Err(e) => debug!("Error converting pool error: {:?}", e),
}
}
ready(Ok(()))
})
.compat();
if let Err(e) = self.executor.execute(Box::new(import_future)) {
warn!("Error scheduling extrinsic import: {:?}", e);
}
}
Err(e) => {
debug!("Error decoding transaction {}", e);
None
}
Err(e) => debug!("Error decoding transaction {}", e),
}
}
@@ -950,6 +961,7 @@ where
#[cfg(test)]
mod tests {
use super::*;
use futures03::executor::block_on;
use consensus_common::SelectChain;
use sr_primitives::traits::BlindCheckable;
use substrate_test_runtime_client::{prelude::*, runtime::{Extrinsic, Transfer}};
@@ -961,7 +973,7 @@ mod tests {
let client = Arc::new(client);
let pool = Arc::new(TransactionPool::new(
Default::default(),
transaction_pool::ChainApi::new(client.clone())
transaction_pool::FullChainApi::new(client.clone())
));
let best = longest_chain.best_chain().unwrap();
let transaction = Transfer {
@@ -970,8 +982,8 @@ mod tests {
from: AccountKeyring::Alice.into(),
to: Default::default(),
}.into_signed_tx();
pool.submit_one(&BlockId::hash(best.hash()), transaction.clone()).unwrap();
pool.submit_one(&BlockId::hash(best.hash()), Extrinsic::IncludeData(vec![1])).unwrap();
block_on(pool.submit_one(&BlockId::hash(best.hash()), transaction.clone())).unwrap();
block_on(pool.submit_one(&BlockId::hash(best.hash()), Extrinsic::IncludeData(vec![1]))).unwrap();
assert_eq!(pool.status().ready, 2);
// when