mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-11 23:31:07 +00:00
transaction-pool: expose blocking api for tx submission (#6325)
* transaction-pool: expose blocking api for tx submission * service: separate ServiceBuilder::build for full and light * service: add ServiceBuilder::build_common * transaction-pool: extend docs Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com> Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>
This commit is contained in:
@@ -111,7 +111,7 @@ pub fn new_full(config: Configuration) -> Result<impl AbstractService, ServiceEr
|
||||
let provider = client as Arc<dyn StorageAndProofProvider<_, _>>;
|
||||
Ok(Arc::new(GrandpaFinalityProofProvider::new(backend, provider)) as _)
|
||||
})?
|
||||
.build()?;
|
||||
.build_full()?;
|
||||
|
||||
if role.is_authority() {
|
||||
let proposer = sc_basic_authorship::ProposerFactory::new(
|
||||
@@ -264,5 +264,5 @@ pub fn new_light(config: Configuration) -> Result<impl AbstractService, ServiceE
|
||||
let provider = client as Arc<dyn StorageAndProofProvider<_, _>>;
|
||||
Ok(Arc::new(GrandpaFinalityProofProvider::new(backend, provider)) as _)
|
||||
})?
|
||||
.build()
|
||||
.build_light()
|
||||
}
|
||||
|
||||
@@ -179,7 +179,7 @@ macro_rules! new_full {
|
||||
let provider = client as Arc<dyn grandpa::StorageAndProofProvider<_, _>>;
|
||||
Ok(Arc::new(grandpa::FinalityProofProvider::new(backend, provider)) as _)
|
||||
})?
|
||||
.build()?;
|
||||
.build_full()?;
|
||||
|
||||
let (block_import, grandpa_link, babe_link) = import_setup.take()
|
||||
.expect("Link Half and Block Import are present for Full Services or setup failed before. qed");
|
||||
@@ -405,7 +405,7 @@ pub fn new_light(config: Configuration)
|
||||
|
||||
Ok(node_rpc::create_light(light_deps))
|
||||
})?
|
||||
.build()?;
|
||||
.build_light()?;
|
||||
|
||||
Ok(service)
|
||||
}
|
||||
|
||||
@@ -57,7 +57,7 @@ use std::{
|
||||
};
|
||||
use wasm_timer::SystemTime;
|
||||
use sc_telemetry::{telemetry, SUBSTRATE_INFO};
|
||||
use sp_transaction_pool::MaintainedTransactionPool;
|
||||
use sp_transaction_pool::{LocalTransactionPool, MaintainedTransactionPool};
|
||||
use prometheus_endpoint::Registry;
|
||||
use sc_client_db::{Backend, DatabaseSettings};
|
||||
use sp_core::traits::CodeExecutor;
|
||||
@@ -959,8 +959,7 @@ ServiceBuilder<
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
/// Builds the service.
|
||||
pub fn build(self) -> Result<Service<
|
||||
fn build_common(self) -> Result<Service<
|
||||
TBl,
|
||||
Client<TBackend, TExec, TBl, TRtApi>,
|
||||
TSc,
|
||||
@@ -1016,10 +1015,6 @@ ServiceBuilder<
|
||||
"best" => ?chain_info.best_hash
|
||||
);
|
||||
|
||||
// make transaction pool available for off-chain runtime calls.
|
||||
client.execution_extensions()
|
||||
.register_transaction_pool(Arc::downgrade(&transaction_pool) as _);
|
||||
|
||||
let transaction_pool_adapter = Arc::new(TransactionPoolAdapter {
|
||||
imports_external_transactions: !matches!(config.role, Role::Light),
|
||||
pool: transaction_pool.clone(),
|
||||
@@ -1421,4 +1416,82 @@ ServiceBuilder<
|
||||
_base_path: config.base_path.map(Arc::new),
|
||||
})
|
||||
}
|
||||
|
||||
/// Builds the light service.
|
||||
pub fn build_light(self) -> Result<Service<
|
||||
TBl,
|
||||
Client<TBackend, TExec, TBl, TRtApi>,
|
||||
TSc,
|
||||
NetworkStatus<TBl>,
|
||||
NetworkService<TBl, <TBl as BlockT>::Hash>,
|
||||
TExPool,
|
||||
sc_offchain::OffchainWorkers<
|
||||
Client<TBackend, TExec, TBl, TRtApi>,
|
||||
TBackend::OffchainStorage,
|
||||
TBl
|
||||
>,
|
||||
>, Error>
|
||||
where TExec: CallExecutor<TBl, Backend = TBackend>,
|
||||
{
|
||||
self.build_common()
|
||||
}
|
||||
}
|
||||
|
||||
impl<TBl, TRtApi, TBackend, TExec, TSc, TImpQu, TExPool, TRpc>
|
||||
ServiceBuilder<
|
||||
TBl,
|
||||
TRtApi,
|
||||
Client<TBackend, TExec, TBl, TRtApi>,
|
||||
Arc<OnDemand<TBl>>,
|
||||
TSc,
|
||||
TImpQu,
|
||||
BoxFinalityProofRequestBuilder<TBl>,
|
||||
Arc<dyn FinalityProofProvider<TBl>>,
|
||||
TExPool,
|
||||
TRpc,
|
||||
TBackend,
|
||||
> where
|
||||
Client<TBackend, TExec, TBl, TRtApi>: ProvideRuntimeApi<TBl>,
|
||||
<Client<TBackend, TExec, TBl, TRtApi> as ProvideRuntimeApi<TBl>>::Api:
|
||||
sp_api::Metadata<TBl> +
|
||||
sc_offchain::OffchainWorkerApi<TBl> +
|
||||
sp_transaction_pool::runtime_api::TaggedTransactionQueue<TBl> +
|
||||
sp_session::SessionKeys<TBl> +
|
||||
sp_api::ApiErrorExt<Error = sp_blockchain::Error> +
|
||||
sp_api::ApiExt<TBl, StateBackend = TBackend::State>,
|
||||
TBl: BlockT,
|
||||
TRtApi: 'static + Send + Sync,
|
||||
TBackend: 'static + sc_client_api::backend::Backend<TBl> + Send,
|
||||
TExec: 'static + CallExecutor<TBl> + Send + Sync + Clone,
|
||||
TSc: Clone,
|
||||
TImpQu: 'static + ImportQueue<TBl>,
|
||||
TExPool: MaintainedTransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash> +
|
||||
LocalTransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash> +
|
||||
MallocSizeOfWasm +
|
||||
'static,
|
||||
TRpc: sc_rpc::RpcExtension<sc_rpc::Metadata>,
|
||||
{
|
||||
|
||||
/// Builds the full service.
|
||||
pub fn build_full(self) -> Result<Service<
|
||||
TBl,
|
||||
Client<TBackend, TExec, TBl, TRtApi>,
|
||||
TSc,
|
||||
NetworkStatus<TBl>,
|
||||
NetworkService<TBl, <TBl as BlockT>::Hash>,
|
||||
TExPool,
|
||||
sc_offchain::OffchainWorkers<
|
||||
Client<TBackend, TExec, TBl, TRtApi>,
|
||||
TBackend::OffchainStorage,
|
||||
TBl
|
||||
>,
|
||||
>, Error>
|
||||
where TExec: CallExecutor<TBl, Backend = TBackend>,
|
||||
{
|
||||
// make transaction pool available for off-chain runtime calls.
|
||||
self.client.execution_extensions()
|
||||
.register_transaction_pool(Arc::downgrade(&self.transaction_pool) as _);
|
||||
|
||||
self.build_common()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,29 +87,15 @@ where
|
||||
let client = self.client.clone();
|
||||
let at = at.clone();
|
||||
|
||||
self.pool.spawn_ok(futures_diagnose::diagnose("validate-transaction", async move {
|
||||
sp_tracing::enter_span!("validate_transaction");
|
||||
let runtime_api = client.runtime_api();
|
||||
let has_v2 = sp_tracing::tracing_span! { "check_version";
|
||||
runtime_api
|
||||
.has_api_with::<dyn TaggedTransactionQueue<Self::Block, Error=()>, _>(
|
||||
&at, |v| v >= 2,
|
||||
)
|
||||
.unwrap_or_default()
|
||||
};
|
||||
|
||||
sp_tracing::enter_span!("runtime::validate_transaction");
|
||||
let res = if has_v2 {
|
||||
runtime_api.validate_transaction(&at, source, uxt)
|
||||
} else {
|
||||
#[allow(deprecated)] // old validate_transaction
|
||||
runtime_api.validate_transaction_before_version_2(&at, uxt)
|
||||
};
|
||||
let res = res.map_err(|e| Error::RuntimeApi(e.to_string()));
|
||||
if let Err(e) = tx.send(res) {
|
||||
log::warn!("Unable to send a validate transaction result: {:?}", e);
|
||||
}
|
||||
}));
|
||||
self.pool.spawn_ok(futures_diagnose::diagnose(
|
||||
"validate-transaction",
|
||||
async move {
|
||||
let res = validate_transaction_blocking(&*client, &at, source, uxt);
|
||||
if let Err(e) = tx.send(res) {
|
||||
log::warn!("Unable to send a validate transaction result: {:?}", e);
|
||||
}
|
||||
},
|
||||
));
|
||||
|
||||
Box::pin(async move {
|
||||
match rx.await {
|
||||
@@ -143,6 +129,62 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper function to validate a transaction using a full chain API.
|
||||
/// This method will call into the runtime to perform the validation.
|
||||
fn validate_transaction_blocking<Client, Block>(
|
||||
client: &Client,
|
||||
at: &BlockId<Block>,
|
||||
source: TransactionSource,
|
||||
uxt: sc_transaction_graph::ExtrinsicFor<FullChainApi<Client, Block>>,
|
||||
) -> error::Result<TransactionValidity>
|
||||
where
|
||||
Block: BlockT,
|
||||
Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + BlockIdTo<Block>,
|
||||
Client: Send + Sync + 'static,
|
||||
Client::Api: TaggedTransactionQueue<Block>,
|
||||
sp_api::ApiErrorFor<Client, Block>: Send + std::fmt::Display,
|
||||
{
|
||||
sp_tracing::enter_span!("validate_transaction");
|
||||
let runtime_api = client.runtime_api();
|
||||
let has_v2 = sp_tracing::tracing_span! { "check_version";
|
||||
runtime_api
|
||||
.has_api_with::<dyn TaggedTransactionQueue<Block, Error=()>, _>(&at, |v| v >= 2)
|
||||
.unwrap_or_default()
|
||||
};
|
||||
|
||||
sp_tracing::enter_span!("runtime::validate_transaction");
|
||||
let res = if has_v2 {
|
||||
runtime_api.validate_transaction(&at, source, uxt)
|
||||
} else {
|
||||
#[allow(deprecated)] // old validate_transaction
|
||||
runtime_api.validate_transaction_before_version_2(&at, uxt)
|
||||
};
|
||||
|
||||
res.map_err(|e| Error::RuntimeApi(e.to_string()))
|
||||
}
|
||||
|
||||
impl<Client, Block> FullChainApi<Client, Block>
|
||||
where
|
||||
Block: BlockT,
|
||||
Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + BlockIdTo<Block>,
|
||||
Client: Send + Sync + 'static,
|
||||
Client::Api: TaggedTransactionQueue<Block>,
|
||||
sp_api::ApiErrorFor<Client, Block>: Send + std::fmt::Display,
|
||||
{
|
||||
/// Validates a transaction by calling into the runtime, same as
|
||||
/// `validate_transaction` but blocks the current thread when performing
|
||||
/// validation. Only implemented for `FullChainApi` since we can call into
|
||||
/// the runtime locally.
|
||||
pub fn validate_transaction_blocking(
|
||||
&self,
|
||||
at: &BlockId<Block>,
|
||||
source: TransactionSource,
|
||||
uxt: sc_transaction_graph::ExtrinsicFor<Self>,
|
||||
) -> error::Result<TransactionValidity> {
|
||||
validate_transaction_blocking(&*self.client, at, source, uxt)
|
||||
}
|
||||
}
|
||||
|
||||
/// The transaction pool logic for light client.
|
||||
pub struct LightChainApi<Client, F, Block> {
|
||||
client: Arc<Client>,
|
||||
|
||||
@@ -352,6 +352,59 @@ impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
|
||||
}
|
||||
}
|
||||
|
||||
impl<Block, Client> sp_transaction_pool::LocalTransactionPool
|
||||
for BasicPool<FullChainApi<Client, Block>, Block>
|
||||
where
|
||||
Block: BlockT,
|
||||
Client: sp_api::ProvideRuntimeApi<Block>
|
||||
+ sc_client_api::BlockBackend<Block>
|
||||
+ sp_runtime::traits::BlockIdTo<Block>,
|
||||
Client: Send + Sync + 'static,
|
||||
Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
|
||||
sp_api::ApiErrorFor<Client, Block>: Send + std::fmt::Display,
|
||||
{
|
||||
type Block = Block;
|
||||
type Hash = sc_transaction_graph::ExtrinsicHash<FullChainApi<Client, Block>>;
|
||||
type Error = <FullChainApi<Client, Block> as ChainApi>::Error;
|
||||
|
||||
fn submit_local(
|
||||
&self,
|
||||
at: &BlockId<Self::Block>,
|
||||
xt: sp_transaction_pool::LocalTransactionFor<Self>,
|
||||
) -> Result<Self::Hash, Self::Error> {
|
||||
use sc_transaction_graph::ValidatedTransaction;
|
||||
use sp_runtime::traits::SaturatedConversion;
|
||||
use sp_runtime::transaction_validity::TransactionValidityError;
|
||||
|
||||
let validity = self
|
||||
.api
|
||||
.validate_transaction_blocking(at, TransactionSource::Local, xt.clone())?
|
||||
.map_err(|e| {
|
||||
Self::Error::Pool(match e {
|
||||
TransactionValidityError::Invalid(i) => i.into(),
|
||||
TransactionValidityError::Unknown(u) => u.into(),
|
||||
})
|
||||
})?;
|
||||
|
||||
let (hash, bytes) = self.pool.validated_pool().api().hash_and_length(&xt);
|
||||
let block_number = self
|
||||
.api
|
||||
.block_id_to_number(at)?
|
||||
.ok_or_else(|| error::Error::BlockIdConversion(format!("{:?}", at)))?;
|
||||
|
||||
let validated = ValidatedTransaction::valid_at(
|
||||
block_number.saturated_into::<u64>(),
|
||||
hash.clone(),
|
||||
TransactionSource::Local,
|
||||
xt,
|
||||
bytes,
|
||||
validity,
|
||||
);
|
||||
|
||||
self.pool.validated_pool().submit(vec![validated]).remove(0)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg_attr(test, derive(Debug))]
|
||||
enum RevalidationStatus<N> {
|
||||
/// The revalidation has never been completed.
|
||||
|
||||
@@ -141,6 +141,8 @@ pub type BlockHash<P> = <<P as TransactionPool>::Block as BlockT>::Hash;
|
||||
pub type TransactionFor<P> = <<P as TransactionPool>::Block as BlockT>::Extrinsic;
|
||||
/// Type of transactions event stream for a pool.
|
||||
pub type TransactionStatusStreamFor<P> = TransactionStatusStream<TxHash<P>, BlockHash<P>>;
|
||||
/// Transaction type for a local pool.
|
||||
pub type LocalTransactionFor<P> = <<P as LocalTransactionPool>::Block as BlockT>::Extrinsic;
|
||||
|
||||
/// Typical future type used in transaction pool api.
|
||||
pub type PoolFuture<T, E> = std::pin::Pin<Box<dyn Future<Output=Result<T, E>> + Send>>;
|
||||
@@ -273,6 +275,28 @@ pub trait MaintainedTransactionPool: TransactionPool {
|
||||
fn maintain(&self, event: ChainEvent<Self::Block>) -> Pin<Box<dyn Future<Output=()> + Send>>;
|
||||
}
|
||||
|
||||
/// Transaction pool interface for submitting local transactions that exposes a
|
||||
/// blocking interface for submission.
|
||||
pub trait LocalTransactionPool: Send + Sync {
|
||||
/// Block type.
|
||||
type Block: BlockT;
|
||||
/// Transaction hash type.
|
||||
type Hash: Hash + Eq + Member + Serialize;
|
||||
/// Error type.
|
||||
type Error: From<crate::error::Error> + crate::error::IntoPoolError;
|
||||
|
||||
/// Submits the given local unverified transaction to the pool blocking the
|
||||
/// current thread for any necessary pre-verification.
|
||||
/// NOTE: It MUST NOT be used for transactions that originate from the
|
||||
/// network or RPC, since the validation is performed with
|
||||
/// `TransactionSource::Local`.
|
||||
fn submit_local(
|
||||
&self,
|
||||
at: &BlockId<Self::Block>,
|
||||
xt: LocalTransactionFor<Self>,
|
||||
) -> Result<Self::Hash, Self::Error>;
|
||||
}
|
||||
|
||||
/// An abstraction for transaction pool.
|
||||
///
|
||||
/// This trait is used by offchain calls to be able to submit transactions.
|
||||
@@ -291,7 +315,7 @@ pub trait OffchainSubmitTransaction<Block: BlockT>: Send + Sync {
|
||||
) -> Result<(), ()>;
|
||||
}
|
||||
|
||||
impl<TPool: TransactionPool> OffchainSubmitTransaction<TPool::Block> for TPool {
|
||||
impl<TPool: LocalTransactionPool> OffchainSubmitTransaction<TPool::Block> for TPool {
|
||||
fn submit_at(
|
||||
&self,
|
||||
at: &BlockId<TPool::Block>,
|
||||
@@ -303,15 +327,14 @@ impl<TPool: TransactionPool> OffchainSubmitTransaction<TPool::Block> for TPool {
|
||||
extrinsic
|
||||
);
|
||||
|
||||
let result = futures::executor::block_on(self.submit_one(
|
||||
&at, TransactionSource::Local, extrinsic,
|
||||
));
|
||||
let result = self.submit_local(&at, extrinsic);
|
||||
|
||||
result.map(|_| ())
|
||||
.map_err(|e| log::warn!(
|
||||
result.map(|_| ()).map_err(|e| {
|
||||
log::warn!(
|
||||
target: "txpool",
|
||||
"(offchain call) Error submitting a transaction to the pool: {:?}",
|
||||
e
|
||||
))
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user