From 00e5ecdcf7be1c43fedffd8e8fa529eea44e2512 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Wed, 31 May 2023 02:12:53 +0100 Subject: [PATCH] Make offchain tx pool creation reusable (#14230) * Make offchain tx pool creation reusable Introduces an `OffchainTransactionPoolFactory` for creating offchain transactions pools that can be registered in the runtime externalities context. This factory will be required for a later pr to make the creation of offchain transaction pools easier. * Update client/transaction-pool/api/src/lib.rs Co-authored-by: Anton --------- Co-authored-by: Anton --- substrate/Cargo.lock | 2 + .../client/api/src/execution_extensions.rs | 55 +++----------- substrate/client/offchain/src/lib.rs | 35 ++------- .../client/transaction-pool/api/Cargo.toml | 2 + .../client/transaction-pool/api/src/lib.rs | 76 ++++++++++++++++--- substrate/client/transaction-pool/src/lib.rs | 18 +++-- 6 files changed, 100 insertions(+), 88 deletions(-) diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index 5c44f781ee..254eb3f94f 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -10193,9 +10193,11 @@ dependencies = [ "async-trait", "futures", "log", + "parity-scale-codec", "serde", "serde_json", "sp-blockchain", + "sp-core", "sp-runtime", "thiserror", ] diff --git a/substrate/client/api/src/execution_extensions.rs b/substrate/client/api/src/execution_extensions.rs index 9ff4b6db41..20dc34d19b 100644 --- a/substrate/client/api/src/execution_extensions.rs +++ b/substrate/client/api/src/execution_extensions.rs @@ -22,20 +22,16 @@ //! strategy for the runtime calls and provide the right `Externalities` //! extensions to support APIs for particular execution context & capabilities. -use codec::Decode; use parking_lot::RwLock; -use sc_transaction_pool_api::OffchainSubmitTransaction; +use sc_transaction_pool_api::OffchainTransactionPoolFactory; use sp_core::{ - offchain::{self, OffchainDbExt, OffchainWorkerExt, TransactionPoolExt}, + offchain::{self, OffchainDbExt, OffchainWorkerExt}, traits::{ReadRuntimeVersion, ReadRuntimeVersionExt}, ExecutionContext, }; use sp_externalities::{Extension, Extensions}; use sp_keystore::{KeystoreExt, KeystorePtr}; -use sp_runtime::{ - generic::BlockId, - traits::{Block as BlockT, NumberFor}, -}; +use sp_runtime::traits::{Block as BlockT, NumberFor}; pub use sp_state_machine::ExecutionStrategy; use sp_state_machine::{DefaultHandler, ExecutionManager}; use std::{ @@ -168,11 +164,7 @@ pub struct ExecutionExtensions { offchain_db: Option>, // FIXME: these three are only RwLock because of https://github.com/paritytech/substrate/issues/4587 // remove when fixed. - // To break retain cycle between `Client` and `TransactionPool` we require this - // extension to be a `Weak` reference. - // That's also the reason why it's being registered lazily instead of - // during initialization. - transaction_pool: RwLock>>>, + transaction_pool_factory: RwLock>>, extensions_factory: RwLock>>, statement_store: RwLock>>, read_runtime_version: Arc, @@ -194,7 +186,7 @@ impl ExecutionExtensions { keystore, offchain_db, extensions_factory: RwLock::new(extensions_factory), - transaction_pool, + transaction_pool_factory: transaction_pool, statement_store, read_runtime_version, } @@ -211,11 +203,11 @@ impl ExecutionExtensions { } /// Register transaction pool extension. - pub fn register_transaction_pool(&self, pool: &Arc) - where - T: OffchainSubmitTransaction + 'static, - { - *self.transaction_pool.write() = Some(Arc::downgrade(pool) as _); + pub fn register_transaction_pool_factory( + &self, + factory: OffchainTransactionPoolFactory, + ) { + *self.transaction_pool_factory.write() = Some(factory); } /// Register statement store extension. @@ -245,11 +237,8 @@ impl ExecutionExtensions { } if capabilities.contains(offchain::Capabilities::TRANSACTION_POOL) { - if let Some(pool) = self.transaction_pool.read().as_ref().and_then(|x| x.upgrade()) { - extensions.register(TransactionPoolExt(Box::new(TransactionPoolAdapter { - at: BlockId::Hash(block_hash), - pool, - }) as _)); + if let Some(pool) = self.transaction_pool_factory.read().as_ref() { + extensions.register(pool.offchain_transaction_pool(block_hash)); } } @@ -303,23 +292,3 @@ impl ExecutionExtensions { (manager, self.extensions(block_hash, block_number, context)) } } - -/// A wrapper type to pass `BlockId` to the actual transaction pool. -struct TransactionPoolAdapter { - at: BlockId, - pool: Arc>, -} - -impl offchain::TransactionPool for TransactionPoolAdapter { - fn submit_transaction(&mut self, data: Vec) -> Result<(), ()> { - let xt = match Block::Extrinsic::decode(&mut &*data) { - Ok(xt) => xt, - Err(e) => { - log::warn!("Unable to decode extrinsic: {:?}: {}", data, e); - return Err(()) - }, - }; - - self.pool.submit_at(&self.at, xt) - } -} diff --git a/substrate/client/offchain/src/lib.rs b/substrate/client/offchain/src/lib.rs index 27f7df3837..0945e39a3a 100644 --- a/substrate/client/offchain/src/lib.rs +++ b/substrate/client/offchain/src/lib.rs @@ -247,16 +247,15 @@ mod tests { use sc_block_builder::BlockBuilderProvider as _; use sc_client_api::Backend as _; use sc_network::{config::MultiaddrWithPeerId, types::ProtocolName, ReputationChange}; - use sc_transaction_pool::{BasicPool, FullChainApi}; + use sc_transaction_pool::BasicPool; use sc_transaction_pool_api::{InPoolTransaction, TransactionPool}; use sp_consensus::BlockOrigin; - use sp_runtime::generic::BlockId; use std::{collections::HashSet, sync::Arc}; use substrate_test_runtime_client::{ runtime::{ - substrate_test_pallet::pallet::Call as PalletCall, Block, ExtrinsicBuilder, RuntimeCall, + substrate_test_pallet::pallet::Call as PalletCall, ExtrinsicBuilder, RuntimeCall, }, - ClientBlockImportExt, DefaultTestClientBuilderExt, TestClient, TestClientBuilderExt, + ClientBlockImportExt, DefaultTestClientBuilderExt, TestClientBuilderExt, }; struct TestNetwork(); @@ -337,34 +336,14 @@ mod tests { } } - struct TestPool(Arc, Block>>); - - impl sc_transaction_pool_api::OffchainSubmitTransaction for TestPool { - fn submit_at( - &self, - at: &BlockId, - extrinsic: ::Extrinsic, - ) -> Result<(), ()> { - let source = sc_transaction_pool_api::TransactionSource::Local; - futures::executor::block_on(self.0.submit_one(&at, source, extrinsic)) - .map(|_| ()) - .map_err(|_| ()) - } - } - #[test] fn should_call_into_runtime_and_produce_extrinsic() { sp_tracing::try_init_simple(); let client = Arc::new(substrate_test_runtime_client::new()); let spawner = sp_core::testing::TaskExecutor::new(); - let pool = TestPool(BasicPool::new_full( - Default::default(), - true.into(), - None, - spawner, - client.clone(), - )); + let pool = + BasicPool::new_full(Default::default(), true.into(), None, spawner, client.clone()); let network = Arc::new(TestNetwork()); let header = client.header(client.chain_info().genesis_hash).unwrap().unwrap(); @@ -373,9 +352,9 @@ mod tests { futures::executor::block_on(offchain.on_block_imported(&header, network, false)); // then - assert_eq!(pool.0.status().ready, 1); + assert_eq!(pool.status().ready, 1); assert!(matches!( - pool.0.ready().next().unwrap().data().function, + pool.ready().next().unwrap().data().function, RuntimeCall::SubstrateTest(PalletCall::storage_change { .. }) )); } diff --git a/substrate/client/transaction-pool/api/Cargo.toml b/substrate/client/transaction-pool/api/Cargo.toml index 23e3d8579f..282e9b1f61 100644 --- a/substrate/client/transaction-pool/api/Cargo.toml +++ b/substrate/client/transaction-pool/api/Cargo.toml @@ -10,11 +10,13 @@ description = "Transaction pool client facing API." [dependencies] async-trait = "0.1.57" +codec = { package = "parity-scale-codec", version = "3.2.2" } futures = "0.3.21" log = "0.4.17" serde = { version = "1.0.136", features = ["derive"] } thiserror = "1.0.30" sp-blockchain = { version = "4.0.0-dev", path = "../../../primitives/blockchain" } +sp-core = { version = "8.0.0", default-features = false, path = "../../../primitives/core" } sp-runtime = { version = "8.0.0", default-features = false, path = "../../../primitives/runtime" } [dev-dependencies] diff --git a/substrate/client/transaction-pool/api/src/lib.rs b/substrate/client/transaction-pool/api/src/lib.rs index 428d0aed62..eab9614cb8 100644 --- a/substrate/client/transaction-pool/api/src/lib.rs +++ b/substrate/client/transaction-pool/api/src/lib.rs @@ -24,11 +24,17 @@ pub mod error; use async_trait::async_trait; use futures::{Future, Stream}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use sp_core::offchain::TransactionPoolExt; use sp_runtime::{ generic::BlockId, traits::{Block as BlockT, Member, NumberFor}, }; -use std::{collections::HashMap, hash::Hash, pin::Pin, sync::Arc}; +use std::{ + collections::HashMap, + hash::Hash, + pin::Pin, + sync::{Arc, Weak}, +}; const LOG_TARGET: &str = "txpool::api"; @@ -329,29 +335,27 @@ pub trait LocalTransactionPool: Send + Sync { /// `TransactionSource::Local`. fn submit_local( &self, - at: &BlockId, + at: ::Hash, xt: LocalTransactionFor, ) -> Result; } -/// An abstraction for transaction pool. +/// An abstraction for [`LocalTransactionPool`] /// -/// This trait is used by offchain calls to be able to submit transactions. -/// The main use case is for offchain workers, to feed back the results of computations, -/// but since the transaction pool access is a separate `ExternalitiesExtension` it can -/// be also used in context of other offchain calls. For one may generate and submit -/// a transaction for some misbehavior reports (say equivocation). -pub trait OffchainSubmitTransaction: Send + Sync { +/// We want to use a transaction pool in [`OffchainTransactionPoolFactory`] in a `Arc` without +/// bleeding the associated types besides the `Block`. Thus, this abstraction here exists to achieve +/// the wrapping in a `Arc`. +trait OffchainSubmitTransaction: Send + Sync { /// Submit transaction. /// /// The transaction will end up in the pool and be propagated to others. - fn submit_at(&self, at: &BlockId, extrinsic: Block::Extrinsic) -> Result<(), ()>; + fn submit_at(&self, at: Block::Hash, extrinsic: Block::Extrinsic) -> Result<(), ()>; } impl OffchainSubmitTransaction for TPool { fn submit_at( &self, - at: &BlockId, + at: ::Hash, extrinsic: ::Extrinsic, ) -> Result<(), ()> { log::debug!( @@ -372,6 +376,56 @@ impl OffchainSubmitTransaction for TP } } +/// Factory for creating [`TransactionPoolExt`]s. +/// +/// This provides an easy way for creating [`TransactionPoolExt`] extensions for registering them in +/// the wasm execution environment to send transactions from an offchain call to the runtime. +#[derive(Clone)] +pub struct OffchainTransactionPoolFactory { + // To break retain cycle between `Client` and `TransactionPool` we require this + // extension to be a `Weak` reference. + pool: Weak>, +} + +impl OffchainTransactionPoolFactory { + /// Creates a new instance using the given `tx_pool`. + pub fn new + 'static>(tx_pool: &Arc) -> Self { + Self { pool: Arc::downgrade(tx_pool) as Weak<_> } + } + + /// Returns an instance of [`TransactionPoolExt`] bound to the given `block_hash`. + /// + /// Transactions that are being submitted by this instance will be submitted with `block_hash` + /// as context for validation. + pub fn offchain_transaction_pool(&self, block_hash: Block::Hash) -> TransactionPoolExt { + TransactionPoolExt::new(OffchainTransactionPool { pool: self.pool.clone(), block_hash }) + } +} + +/// Wraps a `pool` and `block_hash` to implement [`sp_core::offchain::TransactionPool`]. +struct OffchainTransactionPool { + block_hash: Block::Hash, + pool: Weak>, +} + +impl sp_core::offchain::TransactionPool for OffchainTransactionPool { + fn submit_transaction(&mut self, extrinsic: Vec) -> Result<(), ()> { + let extrinsic = match codec::Decode::decode(&mut &extrinsic[..]) { + Ok(t) => t, + Err(e) => { + log::error!( + target: LOG_TARGET, + "Failed to decode extrinsic in `OffchainTransactionPool::submit_transaction`: {e:?}" + ); + + return Err(()) + }, + }; + + self.pool.upgrade().ok_or(())?.submit_at(self.block_hash, extrinsic) + } +} + /// Wrapper functions to keep the API backwards compatible over the wire for the old RPC spec. mod v1_compatible { use serde::{Deserialize, Deserializer, Serialize, Serializer}; diff --git a/substrate/client/transaction-pool/src/lib.rs b/substrate/client/transaction-pool/src/lib.rs index c3a85a373b..1b438bd7e4 100644 --- a/substrate/client/transaction-pool/src/lib.rs +++ b/substrate/client/transaction-pool/src/lib.rs @@ -52,8 +52,8 @@ use std::{ use graph::{ExtrinsicHash, IsValidator}; use sc_transaction_pool_api::{ error::Error as TxPoolError, ChainEvent, ImportNotificationStream, MaintainedTransactionPool, - PoolFuture, PoolStatus, ReadyTransactions, TransactionFor, TransactionPool, TransactionSource, - TransactionStatusStreamFor, TxHash, + OffchainTransactionPoolFactory, PoolFuture, PoolStatus, ReadyTransactions, TransactionFor, + TransactionPool, TransactionSource, TransactionStatusStreamFor, TxHash, }; use sp_core::traits::SpawnEssentialNamed; use sp_runtime::{ @@ -397,7 +397,9 @@ where )); // make transaction pool available for off-chain runtime calls. - client.execution_extensions().register_transaction_pool(&pool); + client + .execution_extensions() + .register_transaction_pool_factory(OffchainTransactionPoolFactory::new(&pool)); pool } @@ -421,7 +423,7 @@ where fn submit_local( &self, - at: &BlockId, + at: Block::Hash, xt: sc_transaction_pool_api::LocalTransactionFor, ) -> Result { use sp_runtime::{ @@ -430,7 +432,11 @@ where let validity = self .api - .validate_transaction_blocking(at, TransactionSource::Local, xt.clone())? + .validate_transaction_blocking( + &BlockId::hash(at), + TransactionSource::Local, + xt.clone(), + )? .map_err(|e| { Self::Error::Pool(match e { TransactionValidityError::Invalid(i) => TxPoolError::InvalidTransaction(i), @@ -441,7 +447,7 @@ where let (hash, bytes) = self.pool.validated_pool().api().hash_and_length(&xt); let block_number = self .api - .block_id_to_number(at)? + .block_id_to_number(&BlockId::hash(at))? .ok_or_else(|| error::Error::BlockIdConversion(format!("{:?}", at)))?; let validated = ValidatedTransaction::valid_at(