mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-21 05:11:02 +00:00
Handle AccountIndices in transaction pool (#225)
* Merge remote-tracking branch 'origin/master' into gav-xts-dont-panic * Update wasm. * consensus, session and staking all panic-safe. * Democracy doesn't panic in apply. * Fix tests. * Extra helper macro, council depanicked. * Fix one test. * Fix up all council tests. No panics! * Council voting depanicked. * Dispatch returns result. * session & staking tests updated * Fix democracy tests. * Fix council tests. * Fix up polkadot parachains in runtime * Fix borked merge * More Slicable support Support general `Option` and array types. * Basic storage types. * Existential deposit for contract creation * Basic implemnetation along with removals * Fix tests. * externalities builder fix. * Tests. * Fix up the runtime. * Fix tests. * Add generic `Address` type. * Initial function integration of Address into Extrinsic. * Fix build * All tests compile. * Fix (some) tests. * Fix signing. * Push error. * transfer can accept Address * Make Address generic over AccountIndex * Fix test * Make Council use Address for dispatch. * Fix build * Bend over backwards to support braindead derive. * Repot some files. * Fix tests. * Fix grumbles * Remove Default bound * Fix build for new nightly. * Make `apply_extrinsic` never panic, return useful Result. * More merge hell * Doesn't build, but might do soon * Serde woes * get substrate-runtime-staking compiling * Polkadot builds again! * Fix all build. * Fix tests & binaries. * Reserve some extra initial byte values of address for future format changes * Make semantic of `ReservedBalance` clear. * Fix panic handler. * Integrate other balance transformations into the new model Fix up staking tests. * Fix runtime tests. * Fix panic build. * Tests for demonstrating interaction between balance types. * Repot some runtime code * Fix checkedblock in non-std builds * Get rid of `DoLookup` phantom. * Attempt to make transaction_pool work with lookups. * Remove vscode settings * New attempt at making transaction pool work. * It builds again! * --all builds * Fix tests. * New build. * Test account nonce reset. * polkadot transaction pool tests/framework. * Address grumbles. * Pool support non-verified transactions. * Revert bad `map_or` * Rebuild binaries, workaround. * Avoid casting to usize early. * Make verification use provided block_id. * Fix tests. * Alter tests to use retry. * Fix tests & add call to re-verify. * Semi-refactor. * Integrate new queue with the rest of the code. * Fix tests. * Add reverify_transaction method. * Use result.
This commit is contained in:
@@ -50,15 +50,15 @@ pub mod error;
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use demo_primitives::Hash;
|
use demo_primitives::Hash;
|
||||||
use demo_runtime::{GenesisConfig, ConsensusConfig, CouncilConfig, DemocracyConfig,
|
use demo_runtime::{GenesisConfig, ConsensusConfig, CouncilConfig, DemocracyConfig,
|
||||||
SessionConfig, StakingConfig, BuildStorage};
|
SessionConfig, StakingConfig};
|
||||||
use demo_runtime::{Block, UncheckedExtrinsic};
|
use demo_runtime::{Block, BlockId, UncheckedExtrinsic, BuildStorage};
|
||||||
use futures::{Future, Sink, Stream};
|
use futures::{Future, Sink, Stream};
|
||||||
|
|
||||||
struct DummyPool;
|
struct DummyPool;
|
||||||
impl extrinsic_pool::api::ExtrinsicPool<UncheckedExtrinsic, Hash> for DummyPool {
|
impl extrinsic_pool::api::ExtrinsicPool<UncheckedExtrinsic, BlockId, Hash> for DummyPool {
|
||||||
type Error = extrinsic_pool::txpool::Error;
|
type Error = extrinsic_pool::txpool::Error;
|
||||||
|
|
||||||
fn submit(&self, _: Vec<UncheckedExtrinsic>)
|
fn submit(&self, _block: BlockId, _: Vec<UncheckedExtrinsic>)
|
||||||
-> Result<Vec<Hash>, Self::Error>
|
-> Result<Vec<Hash>, Self::Error>
|
||||||
{
|
{
|
||||||
Err("unimplemented".into())
|
Err("unimplemented".into())
|
||||||
@@ -155,7 +155,8 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
|
|||||||
let _rpc_servers = {
|
let _rpc_servers = {
|
||||||
let handler = || {
|
let handler = || {
|
||||||
let chain = rpc::apis::chain::Chain::new(client.clone(), core.remote());
|
let chain = rpc::apis::chain::Chain::new(client.clone(), core.remote());
|
||||||
rpc::rpc_handler::<Block, _, _, _, _>(client.clone(), chain, Arc::new(DummyPool), DummySystem)
|
let author = rpc::apis::author::Author::new(client.clone(), Arc::new(DummyPool));
|
||||||
|
rpc::rpc_handler::<Block, _, _, _, _>(client.clone(), chain, author, DummySystem)
|
||||||
};
|
};
|
||||||
let http_address = "127.0.0.1:9933".parse().unwrap();
|
let http_address = "127.0.0.1:9933".parse().unwrap();
|
||||||
let ws_address = "127.0.0.1:9944".parse().unwrap();
|
let ws_address = "127.0.0.1:9944".parse().unwrap();
|
||||||
|
|||||||
@@ -162,6 +162,8 @@ pub type Address = staking::Address<Concrete>;
|
|||||||
pub type Header = generic::Header<BlockNumber, BlakeTwo256, Vec<u8>>;
|
pub type Header = generic::Header<BlockNumber, BlakeTwo256, Vec<u8>>;
|
||||||
/// Block type as expected by this runtime.
|
/// Block type as expected by this runtime.
|
||||||
pub type Block = generic::Block<Header, UncheckedExtrinsic>;
|
pub type Block = generic::Block<Header, UncheckedExtrinsic>;
|
||||||
|
/// BlockId type as expected by this runtime.
|
||||||
|
pub type BlockId = generic::BlockId<Block>;
|
||||||
/// Unchecked extrinsic type as expected by this runtime.
|
/// Unchecked extrinsic type as expected by this runtime.
|
||||||
pub type UncheckedExtrinsic = generic::UncheckedExtrinsic<Address, Index, Call, Signature>;
|
pub type UncheckedExtrinsic = generic::UncheckedExtrinsic<Address, Index, Call, Signature>;
|
||||||
/// Extrinsic type as expected by this runtime. This is not the type that is signed.
|
/// Extrinsic type as expected by this runtime. This is not the type that is signed.
|
||||||
|
|||||||
@@ -27,12 +27,12 @@ parking_lot = "0.4"
|
|||||||
serde_json = "1.0"
|
serde_json = "1.0"
|
||||||
serde = "1.0"
|
serde = "1.0"
|
||||||
substrate-client = { path = "../../substrate/client" }
|
substrate-client = { path = "../../substrate/client" }
|
||||||
substrate-state-machine = { path = "../../substrate/state-machine" }
|
|
||||||
substrate-rpc = { path = "../../substrate/rpc" }
|
|
||||||
substrate-rpc-servers = { path = "../../substrate/rpc-servers" }
|
|
||||||
substrate-network = { path = "../../substrate/network" }
|
substrate-network = { path = "../../substrate/network" }
|
||||||
substrate-primitives = { path = "../../substrate/primitives" }
|
substrate-primitives = { path = "../../substrate/primitives" }
|
||||||
|
substrate-rpc = { path = "../../substrate/rpc" }
|
||||||
|
substrate-rpc-servers = { path = "../../substrate/rpc-servers" }
|
||||||
substrate-runtime-primitives = { path = "../../substrate/runtime/primitives" }
|
substrate-runtime-primitives = { path = "../../substrate/runtime/primitives" }
|
||||||
|
substrate-state-machine = { path = "../../substrate/state-machine" }
|
||||||
substrate-telemetry = { path = "../../substrate/telemetry" }
|
substrate-telemetry = { path = "../../substrate/telemetry" }
|
||||||
polkadot-primitives = { path = "../primitives" }
|
polkadot-primitives = { path = "../primitives" }
|
||||||
polkadot-runtime = { path = "../runtime" }
|
polkadot-runtime = { path = "../runtime" }
|
||||||
|
|||||||
@@ -34,13 +34,13 @@ extern crate parking_lot;
|
|||||||
extern crate serde;
|
extern crate serde;
|
||||||
extern crate serde_json;
|
extern crate serde_json;
|
||||||
|
|
||||||
extern crate substrate_primitives;
|
|
||||||
extern crate substrate_state_machine as state_machine;
|
|
||||||
extern crate substrate_client as client;
|
extern crate substrate_client as client;
|
||||||
extern crate substrate_network as network;
|
extern crate substrate_network as network;
|
||||||
|
extern crate substrate_primitives;
|
||||||
extern crate substrate_rpc;
|
extern crate substrate_rpc;
|
||||||
extern crate substrate_rpc_servers as rpc;
|
extern crate substrate_rpc_servers as rpc;
|
||||||
extern crate substrate_runtime_primitives as runtime_primitives;
|
extern crate substrate_runtime_primitives as runtime_primitives;
|
||||||
|
extern crate substrate_state_machine as state_machine;
|
||||||
extern crate polkadot_primitives;
|
extern crate polkadot_primitives;
|
||||||
extern crate polkadot_runtime;
|
extern crate polkadot_runtime;
|
||||||
extern crate polkadot_service as service;
|
extern crate polkadot_service as service;
|
||||||
@@ -192,12 +192,12 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
|
|||||||
|
|
||||||
let (mut genesis_storage, boot_nodes) = PresetConfig::from_spec(chain_spec)
|
let (mut genesis_storage, boot_nodes) = PresetConfig::from_spec(chain_spec)
|
||||||
.map(PresetConfig::deconstruct)
|
.map(PresetConfig::deconstruct)
|
||||||
.unwrap_or_else(|f| (Box::new(move ||
|
.unwrap_or_else(|f| (Box::new(move ||
|
||||||
read_storage_json(&f)
|
read_storage_json(&f)
|
||||||
.map(|s| { info!("{} storage items read from {}", s.len(), f); s })
|
.map(|s| { info!("{} storage items read from {}", s.len(), f); s })
|
||||||
.unwrap_or_else(|| panic!("Bad genesis state file: {}", f))
|
.unwrap_or_else(|| panic!("Bad genesis state file: {}", f))
|
||||||
), vec![]));
|
), vec![]));
|
||||||
|
|
||||||
if matches.is_present("build-genesis") {
|
if matches.is_present("build-genesis") {
|
||||||
info!("Building genesis");
|
info!("Building genesis");
|
||||||
for (i, (k, v)) in genesis_storage().iter().enumerate() {
|
for (i, (k, v)) in genesis_storage().iter().enumerate() {
|
||||||
@@ -285,10 +285,11 @@ fn run_until_exit<C>(mut core: reactor::Core, service: service::Service<C>, matc
|
|||||||
|
|
||||||
let handler = || {
|
let handler = || {
|
||||||
let chain = rpc::apis::chain::Chain::new(service.client(), core.remote());
|
let chain = rpc::apis::chain::Chain::new(service.client(), core.remote());
|
||||||
|
let author = rpc::apis::author::Author::new(service.client(), service.transaction_pool());
|
||||||
rpc::rpc_handler::<Block, _, _, _, _>(
|
rpc::rpc_handler::<Block, _, _, _, _>(
|
||||||
service.client(),
|
service.client(),
|
||||||
chain,
|
chain,
|
||||||
service.transaction_pool(),
|
author,
|
||||||
sys_conf.clone(),
|
sys_conf.clone(),
|
||||||
)
|
)
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -74,7 +74,7 @@ use polkadot_primitives::{Hash, Block, BlockId, BlockNumber, Header, Timestamp};
|
|||||||
use polkadot_primitives::parachain::{Id as ParaId, Chain, DutyRoster, BlockData, Extrinsic as ParachainExtrinsic, CandidateReceipt};
|
use polkadot_primitives::parachain::{Id as ParaId, Chain, DutyRoster, BlockData, Extrinsic as ParachainExtrinsic, CandidateReceipt};
|
||||||
use polkadot_runtime::BareExtrinsic;
|
use polkadot_runtime::BareExtrinsic;
|
||||||
use primitives::AuthorityId;
|
use primitives::AuthorityId;
|
||||||
use transaction_pool::{Ready, TransactionPool};
|
use transaction_pool::{TransactionPool};
|
||||||
use tokio_core::reactor::{Handle, Timeout, Interval};
|
use tokio_core::reactor::{Handle, Timeout, Interval};
|
||||||
|
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
@@ -226,7 +226,7 @@ pub struct ProposerFactory<C, N, P> {
|
|||||||
/// The client instance.
|
/// The client instance.
|
||||||
pub client: Arc<C>,
|
pub client: Arc<C>,
|
||||||
/// The transaction pool.
|
/// The transaction pool.
|
||||||
pub transaction_pool: Arc<TransactionPool>,
|
pub transaction_pool: Arc<TransactionPool<C>>,
|
||||||
/// The backing network handle.
|
/// The backing network handle.
|
||||||
pub network: N,
|
pub network: N,
|
||||||
/// Parachain collators.
|
/// Parachain collators.
|
||||||
@@ -239,7 +239,8 @@ pub struct ProposerFactory<C, N, P> {
|
|||||||
|
|
||||||
impl<C, N, P> bft::ProposerFactory<Block> for ProposerFactory<C, N, P>
|
impl<C, N, P> bft::ProposerFactory<Block> for ProposerFactory<C, N, P>
|
||||||
where
|
where
|
||||||
C: PolkadotApi,
|
C: PolkadotApi + Send + Sync,
|
||||||
|
C::CheckedBlockId: Sync,
|
||||||
N: Network,
|
N: Network,
|
||||||
P: Collators,
|
P: Collators,
|
||||||
{
|
{
|
||||||
@@ -319,12 +320,13 @@ pub struct Proposer<C: PolkadotApi, R, P> {
|
|||||||
random_seed: Hash,
|
random_seed: Hash,
|
||||||
router: R,
|
router: R,
|
||||||
table: Arc<SharedTable>,
|
table: Arc<SharedTable>,
|
||||||
transaction_pool: Arc<TransactionPool>,
|
transaction_pool: Arc<TransactionPool<C>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<C, R, P> bft::Proposer<Block> for Proposer<C, R, P>
|
impl<C, R, P> bft::Proposer<Block> for Proposer<C, R, P>
|
||||||
where
|
where
|
||||||
C: PolkadotApi,
|
C: PolkadotApi + Send + Sync,
|
||||||
|
C::CheckedBlockId: Sync,
|
||||||
R: TableRouter,
|
R: TableRouter,
|
||||||
P: Collators,
|
P: Collators,
|
||||||
{
|
{
|
||||||
@@ -501,8 +503,7 @@ impl<C, R, P> bft::Proposer<Block> for Proposer<C, R, P>
|
|||||||
|
|
||||||
let local_id = self.local_key.public().0.into();
|
let local_id = self.local_key.public().0.into();
|
||||||
let mut next_index = {
|
let mut next_index = {
|
||||||
let readiness_evaluator = Ready::create(self.parent_id.clone(), &*self.client);
|
let cur_index = self.transaction_pool.cull_and_get_pending(BlockId::hash(self.parent_hash), |pending| pending
|
||||||
let cur_index = self.transaction_pool.cull_and_get_pending(readiness_evaluator, |pending| pending
|
|
||||||
.filter(|tx| tx.sender().map(|s| s == local_id).unwrap_or(false))
|
.filter(|tx| tx.sender().map(|s| s == local_id).unwrap_or(false))
|
||||||
.last()
|
.last()
|
||||||
.map(|tx| Ok(tx.index()))
|
.map(|tx| Ok(tx.index()))
|
||||||
@@ -510,7 +511,11 @@ impl<C, R, P> bft::Proposer<Block> for Proposer<C, R, P>
|
|||||||
);
|
);
|
||||||
|
|
||||||
match cur_index {
|
match cur_index {
|
||||||
Ok(cur_index) => cur_index + 1,
|
Ok(Ok(cur_index)) => cur_index + 1,
|
||||||
|
Ok(Err(e)) => {
|
||||||
|
warn!(target: "consensus", "Error computing next transaction index: {}", e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!(target: "consensus", "Error computing next transaction index: {}", e);
|
warn!(target: "consensus", "Error computing next transaction index: {}", e);
|
||||||
return;
|
return;
|
||||||
@@ -549,7 +554,7 @@ impl<C, R, P> bft::Proposer<Block> for Proposer<C, R, P>
|
|||||||
};
|
};
|
||||||
let uxt = UncheckedExtrinsic::new(extrinsic, signature);
|
let uxt = UncheckedExtrinsic::new(extrinsic, signature);
|
||||||
|
|
||||||
self.transaction_pool.import_unchecked_extrinsic(uxt)
|
self.transaction_pool.import_unchecked_extrinsic(BlockId::hash(self.parent_hash), uxt)
|
||||||
.expect("locally signed extrinsic is valid; qed");
|
.expect("locally signed extrinsic is valid; qed");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -618,7 +623,7 @@ pub struct CreateProposal<C: PolkadotApi, R, P: Collators> {
|
|||||||
parent_number: BlockNumber,
|
parent_number: BlockNumber,
|
||||||
parent_id: C::CheckedBlockId,
|
parent_id: C::CheckedBlockId,
|
||||||
client: Arc<C>,
|
client: Arc<C>,
|
||||||
transaction_pool: Arc<TransactionPool>,
|
transaction_pool: Arc<TransactionPool<C>>,
|
||||||
collation: CollationFetch<P, C>,
|
collation: CollationFetch<P, C>,
|
||||||
router: R,
|
router: R,
|
||||||
table: Arc<SharedTable>,
|
table: Arc<SharedTable>,
|
||||||
@@ -640,9 +645,8 @@ impl<C, R, P> CreateProposal<C, R, P>
|
|||||||
let mut block_builder = self.client.build_block(&self.parent_id, timestamp, candidates)?;
|
let mut block_builder = self.client.build_block(&self.parent_id, timestamp, candidates)?;
|
||||||
|
|
||||||
{
|
{
|
||||||
let readiness_evaluator = Ready::create(self.parent_id.clone(), &*self.client);
|
|
||||||
let mut unqueue_invalid = Vec::new();
|
let mut unqueue_invalid = Vec::new();
|
||||||
self.transaction_pool.cull_and_get_pending(readiness_evaluator, |pending_iterator| {
|
let result = self.transaction_pool.cull_and_get_pending(BlockId::hash(self.parent_hash), |pending_iterator| {
|
||||||
let mut pending_size = 0;
|
let mut pending_size = 0;
|
||||||
for pending in pending_iterator {
|
for pending in pending_iterator {
|
||||||
// skip and cull transactions which are too large.
|
// skip and cull transactions which are too large.
|
||||||
@@ -664,6 +668,9 @@ impl<C, R, P> CreateProposal<C, R, P>
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
if let Err(e) = result {
|
||||||
|
warn!("Unable to get the pending set: {:?}", e);
|
||||||
|
}
|
||||||
|
|
||||||
self.transaction_pool.remove(&unqueue_invalid, false);
|
self.transaction_pool.remove(&unqueue_invalid, false);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -235,13 +235,14 @@ impl Service {
|
|||||||
client: Arc<C>,
|
client: Arc<C>,
|
||||||
api: Arc<A>,
|
api: Arc<A>,
|
||||||
network: Arc<net::ConsensusService<Block>>,
|
network: Arc<net::ConsensusService<Block>>,
|
||||||
transaction_pool: Arc<TransactionPool>,
|
transaction_pool: Arc<TransactionPool<A>>,
|
||||||
parachain_empty_duration: Duration,
|
parachain_empty_duration: Duration,
|
||||||
key: ed25519::Pair,
|
key: ed25519::Pair,
|
||||||
) -> Service
|
) -> Service
|
||||||
where
|
where
|
||||||
A: LocalPolkadotApi + Send + Sync + 'static,
|
A: LocalPolkadotApi + Send + Sync + 'static,
|
||||||
C: BlockchainEvents<Block> + ChainHead<Block> + bft::BlockImport<Block> + bft::Authorities<Block> + Send + Sync + 'static,
|
C: BlockchainEvents<Block> + ChainHead<Block> + bft::BlockImport<Block> + bft::Authorities<Block> + Send + Sync + 'static,
|
||||||
|
A::CheckedBlockId: Sync,
|
||||||
{
|
{
|
||||||
let (signal, exit) = ::exit_future::signal();
|
let (signal, exit) = ::exit_future::signal();
|
||||||
let thread = thread::spawn(move || {
|
let thread = thread::spawn(move || {
|
||||||
|
|||||||
@@ -55,11 +55,11 @@ pub trait Components {
|
|||||||
fn build_api(&self, client: Arc<Client<Self::Backend, Self::Executor, Block>>) -> Arc<Self::Api>;
|
fn build_api(&self, client: Arc<Client<Self::Backend, Self::Executor, Block>>) -> Arc<Self::Api>;
|
||||||
|
|
||||||
/// Create network transaction pool adapter.
|
/// Create network transaction pool adapter.
|
||||||
fn build_network_tx_pool(&self, client: Arc<client::Client<Self::Backend, Self::Executor, Block>>, api: Arc<Self::Api>, tx_pool: Arc<TransactionPool>)
|
fn build_network_tx_pool(&self, client: Arc<client::Client<Self::Backend, Self::Executor, Block>>, tx_pool: Arc<TransactionPool<Self::Api>>)
|
||||||
-> Arc<network::TransactionPool<Block>>;
|
-> Arc<network::TransactionPool<Block>>;
|
||||||
|
|
||||||
/// Create consensus service.
|
/// Create consensus service.
|
||||||
fn build_consensus(&self, client: Arc<Client<Self::Backend, Self::Executor, Block>>, network: Arc<network::Service<Block>>, tx_pool: Arc<TransactionPool>, keystore: &Keystore)
|
fn build_consensus(&self, client: Arc<Client<Self::Backend, Self::Executor, Block>>, network: Arc<network::Service<Block>>, tx_pool: Arc<TransactionPool<Self::Api>>, keystore: &Keystore)
|
||||||
-> Result<Option<consensus::Service>, error::Error>;
|
-> Result<Option<consensus::Service>, error::Error>;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -83,17 +83,16 @@ impl Components for FullComponents {
|
|||||||
client
|
client
|
||||||
}
|
}
|
||||||
|
|
||||||
fn build_network_tx_pool(&self, client: Arc<client::Client<Self::Backend, Self::Executor, Block>>, api: Arc<Self::Api>, pool: Arc<TransactionPool>)
|
fn build_network_tx_pool(&self, client: Arc<client::Client<Self::Backend, Self::Executor, Block>>, pool: Arc<TransactionPool<Self::Api>>)
|
||||||
-> Arc<network::TransactionPool<Block>> {
|
-> Arc<network::TransactionPool<Block>> {
|
||||||
Arc::new(TransactionPoolAdapter {
|
Arc::new(TransactionPoolAdapter {
|
||||||
imports_external_transactions: true,
|
imports_external_transactions: true,
|
||||||
pool,
|
pool,
|
||||||
client,
|
client,
|
||||||
api,
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn build_consensus(&self, client: Arc<client::Client<Self::Backend, Self::Executor, Block>>, network: Arc<network::Service<Block>>, tx_pool: Arc<TransactionPool>, keystore: &Keystore)
|
fn build_consensus(&self, client: Arc<client::Client<Self::Backend, Self::Executor, Block>>, network: Arc<network::Service<Block>>, tx_pool: Arc<TransactionPool<Self::Api>>, keystore: &Keystore)
|
||||||
-> Result<Option<consensus::Service>, error::Error> {
|
-> Result<Option<consensus::Service>, error::Error> {
|
||||||
if !self.is_validator {
|
if !self.is_validator {
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
@@ -134,17 +133,16 @@ impl Components for LightComponents {
|
|||||||
Arc::new(polkadot_api::light::RemotePolkadotApiWrapper(client.clone()))
|
Arc::new(polkadot_api::light::RemotePolkadotApiWrapper(client.clone()))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn build_network_tx_pool(&self, client: Arc<client::Client<Self::Backend, Self::Executor, Block>>, api: Arc<Self::Api>, pool: Arc<TransactionPool>)
|
fn build_network_tx_pool(&self, client: Arc<client::Client<Self::Backend, Self::Executor, Block>>, pool: Arc<TransactionPool<Self::Api>>)
|
||||||
-> Arc<network::TransactionPool<Block>> {
|
-> Arc<network::TransactionPool<Block>> {
|
||||||
Arc::new(TransactionPoolAdapter {
|
Arc::new(TransactionPoolAdapter {
|
||||||
imports_external_transactions: false,
|
imports_external_transactions: false,
|
||||||
pool,
|
pool,
|
||||||
client,
|
client,
|
||||||
api,
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn build_consensus(&self, _client: Arc<client::Client<Self::Backend, Self::Executor, Block>>, _network: Arc<network::Service<Block>>, _tx_pool: Arc<TransactionPool>, _keystore: &Keystore)
|
fn build_consensus(&self, _client: Arc<client::Client<Self::Backend, Self::Executor, Block>>, _network: Arc<network::Service<Block>>, _tx_pool: Arc<TransactionPool<Self::Api>>, _keystore: &Keystore)
|
||||||
-> Result<Option<consensus::Service>, error::Error> {
|
-> Result<Option<consensus::Service>, error::Error> {
|
||||||
Ok(None)
|
Ok(None)
|
||||||
}
|
}
|
||||||
@@ -153,9 +151,25 @@ impl Components for LightComponents {
|
|||||||
/// Transaction pool adapter.
|
/// Transaction pool adapter.
|
||||||
pub struct TransactionPoolAdapter<B, E, A> where A: Send + Sync, E: Send + Sync {
|
pub struct TransactionPoolAdapter<B, E, A> where A: Send + Sync, E: Send + Sync {
|
||||||
imports_external_transactions: bool,
|
imports_external_transactions: bool,
|
||||||
pool: Arc<TransactionPool>,
|
pool: Arc<TransactionPool<A>>,
|
||||||
client: Arc<Client<B, E, Block>>,
|
client: Arc<Client<B, E, Block>>,
|
||||||
api: Arc<A>,
|
}
|
||||||
|
|
||||||
|
impl<B, E, A> TransactionPoolAdapter<B, E, A>
|
||||||
|
where
|
||||||
|
A: Send + Sync,
|
||||||
|
B: client::backend::Backend<Block> + Send + Sync,
|
||||||
|
E: client::CallExecutor<Block> + Send + Sync,
|
||||||
|
client::error::Error: From<<<B as client::backend::Backend<Block>>::State as state_machine::backend::Backend>::Error>,
|
||||||
|
{
|
||||||
|
fn best_block_id(&self) -> Option<BlockId> {
|
||||||
|
self.client.info()
|
||||||
|
.map(|info| BlockId::hash(info.chain.best_hash))
|
||||||
|
.map_err(|e| {
|
||||||
|
debug!("Error getting best block: {:?}", e);
|
||||||
|
})
|
||||||
|
.ok()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<B, E, A> network::TransactionPool<Block> for TransactionPoolAdapter<B, E, A>
|
impl<B, E, A> network::TransactionPool<Block> for TransactionPoolAdapter<B, E, A>
|
||||||
@@ -166,28 +180,20 @@ impl<B, E, A> network::TransactionPool<Block> for TransactionPoolAdapter<B, E, A
|
|||||||
A: polkadot_api::PolkadotApi + Send + Sync,
|
A: polkadot_api::PolkadotApi + Send + Sync,
|
||||||
{
|
{
|
||||||
fn transactions(&self) -> Vec<(Hash, Vec<u8>)> {
|
fn transactions(&self) -> Vec<(Hash, Vec<u8>)> {
|
||||||
let best_block = match self.client.info() {
|
let best_block_id = match self.best_block_id() {
|
||||||
Ok(info) => info.chain.best_hash,
|
Some(id) => id,
|
||||||
Err(e) => {
|
None => return vec![],
|
||||||
debug!("Error getting best block: {:?}", e);
|
|
||||||
return Vec::new();
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
self.pool.cull_and_get_pending(best_block_id, |pending| pending
|
||||||
let id = match self.api.check_id(BlockId::hash(best_block)) {
|
|
||||||
Ok(id) => id,
|
|
||||||
Err(_) => return Vec::new(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let ready = transaction_pool::Ready::create(id, &*self.api);
|
|
||||||
|
|
||||||
self.pool.cull_and_get_pending(ready, |pending| pending
|
|
||||||
.map(|t| {
|
.map(|t| {
|
||||||
let hash = t.hash().clone();
|
let hash = t.hash().clone();
|
||||||
(hash, t.primitive_extrinsic())
|
(hash, t.primitive_extrinsic())
|
||||||
})
|
})
|
||||||
.collect()
|
.collect()
|
||||||
)
|
).unwrap_or_else(|e| {
|
||||||
|
warn!("Error retrieving pending set: {}", e);
|
||||||
|
vec![]
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn import(&self, transaction: &Vec<u8>) -> Option<Hash> {
|
fn import(&self, transaction: &Vec<u8>) -> Option<Hash> {
|
||||||
@@ -197,7 +203,8 @@ impl<B, E, A> network::TransactionPool<Block> for TransactionPoolAdapter<B, E, A
|
|||||||
|
|
||||||
let encoded = transaction.encode();
|
let encoded = transaction.encode();
|
||||||
if let Some(uxt) = codec::Slicable::decode(&mut &encoded[..]) {
|
if let Some(uxt) = codec::Slicable::decode(&mut &encoded[..]) {
|
||||||
match self.pool.import_unchecked_extrinsic(uxt) {
|
let best_block_id = self.best_block_id()?;
|
||||||
|
match self.pool.import_unchecked_extrinsic(best_block_id, uxt) {
|
||||||
Ok(xt) => Some(*xt.hash()),
|
Ok(xt) => Some(*xt.hash()),
|
||||||
Err(e) => match *e.kind() {
|
Err(e) => match *e.kind() {
|
||||||
transaction_pool::ErrorKind::AlreadyImported(hash) => Some(hash[..].into()),
|
transaction_pool::ErrorKind::AlreadyImported(hash) => Some(hash[..].into()),
|
||||||
|
|||||||
@@ -75,7 +75,7 @@ pub struct Service<Components: components::Components> {
|
|||||||
thread: Option<thread::JoinHandle<()>>,
|
thread: Option<thread::JoinHandle<()>>,
|
||||||
client: Arc<Client<Components::Backend, Components::Executor, Block>>,
|
client: Arc<Client<Components::Backend, Components::Executor, Block>>,
|
||||||
network: Arc<network::Service<Block>>,
|
network: Arc<network::Service<Block>>,
|
||||||
transaction_pool: Arc<TransactionPool>,
|
transaction_pool: Arc<TransactionPool<Components::Api>>,
|
||||||
signal: Option<Signal>,
|
signal: Option<Signal>,
|
||||||
_consensus: Option<consensus::Service>,
|
_consensus: Option<consensus::Service>,
|
||||||
}
|
}
|
||||||
@@ -127,8 +127,8 @@ impl<Components> Service<Components>
|
|||||||
info!("Best block is #{}", best_header.number);
|
info!("Best block is #{}", best_header.number);
|
||||||
telemetry!("node.start"; "height" => best_header.number, "best" => ?best_header.hash());
|
telemetry!("node.start"; "height" => best_header.number, "best" => ?best_header.hash());
|
||||||
|
|
||||||
let transaction_pool = Arc::new(TransactionPool::new(config.transaction_pool));
|
let transaction_pool = Arc::new(TransactionPool::new(config.transaction_pool, api.clone()));
|
||||||
let transaction_pool_adapter = components.build_network_tx_pool(client.clone(), api.clone(), transaction_pool.clone());
|
let transaction_pool_adapter = components.build_network_tx_pool(client.clone(), transaction_pool.clone());
|
||||||
let network_params = network::Params {
|
let network_params = network::Params {
|
||||||
config: network::ProtocolConfig {
|
config: network::ProtocolConfig {
|
||||||
roles: config.roles,
|
roles: config.roles,
|
||||||
@@ -161,7 +161,8 @@ impl<Components> Service<Components>
|
|||||||
let events = client.import_notification_stream()
|
let events = client.import_notification_stream()
|
||||||
.for_each(move |notification| {
|
.for_each(move |notification| {
|
||||||
network1.on_block_imported(notification.hash, ¬ification.header);
|
network1.on_block_imported(notification.hash, ¬ification.header);
|
||||||
prune_imported(&*api, &*txpool1, notification.hash);
|
prune_imported(&*txpool1, notification.hash);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
});
|
});
|
||||||
core.handle().spawn(events);
|
core.handle().spawn(events);
|
||||||
@@ -210,22 +211,22 @@ impl<Components> Service<Components>
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Get shared transaction pool instance.
|
/// Get shared transaction pool instance.
|
||||||
pub fn transaction_pool(&self) -> Arc<TransactionPool> {
|
pub fn transaction_pool(&self) -> Arc<TransactionPool<Components::Api>> {
|
||||||
self.transaction_pool.clone()
|
self.transaction_pool.clone()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Produce a task which prunes any finalized transactions from the pool.
|
/// Produce a task which prunes any finalized transactions from the pool.
|
||||||
pub fn prune_imported<A>(api: &A, pool: &TransactionPool, hash: Hash)
|
pub fn prune_imported<A>(pool: &TransactionPool<A>, hash: Hash)
|
||||||
where
|
where A: PolkadotApi,
|
||||||
A: PolkadotApi,
|
|
||||||
{
|
{
|
||||||
match api.check_id(BlockId::hash(hash)) {
|
let block = BlockId::hash(hash);
|
||||||
Ok(id) => {
|
if let Err(e) = pool.cull(block) {
|
||||||
let ready = transaction_pool::Ready::create(id, api);
|
warn!("Culling error: {:?}", e);
|
||||||
pool.cull(None, ready);
|
}
|
||||||
},
|
|
||||||
Err(e) => warn!("Failed to check block id: {:?}", e),
|
if let Err(e) = pool.retry_verification(block) {
|
||||||
|
warn!("Re-verifying error: {:?}", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -15,12 +15,14 @@
|
|||||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
use extrinsic_pool::{self, txpool};
|
use extrinsic_pool::{self, txpool};
|
||||||
|
use polkadot_api;
|
||||||
use primitives::Hash;
|
use primitives::Hash;
|
||||||
use runtime::{Address, UncheckedExtrinsic};
|
use runtime::{Address, UncheckedExtrinsic};
|
||||||
|
|
||||||
error_chain! {
|
error_chain! {
|
||||||
links {
|
links {
|
||||||
Pool(txpool::Error, txpool::ErrorKind);
|
Pool(txpool::Error, txpool::ErrorKind);
|
||||||
|
Api(polkadot_api::Error, polkadot_api::ErrorKind);
|
||||||
}
|
}
|
||||||
errors {
|
errors {
|
||||||
/// Unexpected extrinsic format submitted
|
/// Unexpected extrinsic format submitted
|
||||||
@@ -53,11 +55,6 @@ error_chain! {
|
|||||||
description("Unrecognised address in extrinsic"),
|
description("Unrecognised address in extrinsic"),
|
||||||
display("Unrecognised address in extrinsic: {}", who),
|
display("Unrecognised address in extrinsic: {}", who),
|
||||||
}
|
}
|
||||||
/// Extrinsic is not yet checked.
|
|
||||||
NotReady {
|
|
||||||
description("Indexed address is unverified"),
|
|
||||||
display("Indexed address is unverified"),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -15,6 +15,7 @@
|
|||||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
extern crate ed25519;
|
extern crate ed25519;
|
||||||
|
extern crate substrate_client as client;
|
||||||
extern crate substrate_codec as codec;
|
extern crate substrate_codec as codec;
|
||||||
extern crate substrate_extrinsic_pool as extrinsic_pool;
|
extern crate substrate_extrinsic_pool as extrinsic_pool;
|
||||||
extern crate substrate_primitives as substrate_primitives;
|
extern crate substrate_primitives as substrate_primitives;
|
||||||
@@ -37,19 +38,17 @@ mod error;
|
|||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
cmp::Ordering,
|
cmp::Ordering,
|
||||||
collections::{hash_map::Entry, HashMap},
|
collections::HashMap,
|
||||||
ops::Deref,
|
ops::Deref,
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
result
|
|
||||||
};
|
};
|
||||||
use parking_lot::Mutex;
|
|
||||||
|
|
||||||
use codec::Slicable;
|
use codec::Slicable;
|
||||||
use extrinsic_pool::{Pool, txpool::{self, Readiness, scoring::{Change, Choice}}};
|
use extrinsic_pool::{Pool, Listener, txpool::{self, Readiness, scoring::{Change, Choice}}};
|
||||||
use extrinsic_pool::api::ExtrinsicPool;
|
use extrinsic_pool::api::ExtrinsicPool;
|
||||||
use polkadot_api::PolkadotApi;
|
use polkadot_api::PolkadotApi;
|
||||||
use primitives::{AccountId, AccountIndex, Hash, Index, UncheckedExtrinsic as FutureProofUncheckedExtrinsic};
|
use primitives::{AccountId, BlockId, Hash, Index, UncheckedExtrinsic as FutureProofUncheckedExtrinsic};
|
||||||
use runtime::{Address, RawAddress, UncheckedExtrinsic};
|
use runtime::{Address, UncheckedExtrinsic};
|
||||||
use substrate_runtime_primitives::traits::{Bounded, Checkable, Hashing, BlakeTwo256};
|
use substrate_runtime_primitives::traits::{Bounded, Checkable, Hashing, BlakeTwo256};
|
||||||
|
|
||||||
pub use extrinsic_pool::txpool::{Options, Status, LightStatus, VerifiedTransaction as VerifiedTransactionOps};
|
pub use extrinsic_pool::txpool::{Options, Status, LightStatus, VerifiedTransaction as VerifiedTransactionOps};
|
||||||
@@ -59,65 +58,16 @@ pub use error::{Error, ErrorKind, Result};
|
|||||||
pub type CheckedExtrinsic = <UncheckedExtrinsic as Checkable>::Checked;
|
pub type CheckedExtrinsic = <UncheckedExtrinsic as Checkable>::Checked;
|
||||||
|
|
||||||
/// A verified transaction which should be includable and non-inherent.
|
/// A verified transaction which should be includable and non-inherent.
|
||||||
#[derive(Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct VerifiedTransaction {
|
pub struct VerifiedTransaction {
|
||||||
original: UncheckedExtrinsic,
|
original: UncheckedExtrinsic,
|
||||||
// `create()` will leave this as `Some` only if the `Address` is an `AccountId`, otherwise a
|
inner: Option<CheckedExtrinsic>,
|
||||||
// call to `polish` is needed.
|
sender: Option<AccountId>,
|
||||||
inner: Mutex<Option<CheckedExtrinsic>>,
|
|
||||||
hash: Hash,
|
hash: Hash,
|
||||||
encoded_size: usize,
|
encoded_size: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Clone for VerifiedTransaction {
|
|
||||||
fn clone(&self) -> Self {
|
|
||||||
VerifiedTransaction {
|
|
||||||
original: self.original.clone(),
|
|
||||||
inner: Mutex::new(self.inner.lock().clone()),
|
|
||||||
hash: self.hash.clone(),
|
|
||||||
encoded_size: self.encoded_size.clone(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl VerifiedTransaction {
|
impl VerifiedTransaction {
|
||||||
/// Attempt to verify a transaction.
|
|
||||||
fn create(original: UncheckedExtrinsic) -> Result<Self> {
|
|
||||||
if !original.is_signed() {
|
|
||||||
bail!(ErrorKind::IsInherent(original))
|
|
||||||
}
|
|
||||||
const UNAVAILABLE_MESSAGE: &'static str = "chain state not available";
|
|
||||||
let (encoded_size, hash) = original.using_encoded(|e| (e.len(), BlakeTwo256::hash(e)));
|
|
||||||
let lookup = |a| match a {
|
|
||||||
RawAddress::Id(i) => Ok(i),
|
|
||||||
_ => Err(UNAVAILABLE_MESSAGE),
|
|
||||||
};
|
|
||||||
let inner = Mutex::new(match original.clone().check(lookup) {
|
|
||||||
Ok(xt) => Some(xt),
|
|
||||||
Err(e) if e == UNAVAILABLE_MESSAGE => None,
|
|
||||||
Err(e) => bail!(ErrorKind::BadSignature(e)),
|
|
||||||
});
|
|
||||||
Ok(VerifiedTransaction { original, inner, hash, encoded_size })
|
|
||||||
}
|
|
||||||
|
|
||||||
/// If this transaction isn't really verified, verify it and morph it into a really verified
|
|
||||||
/// transaction.
|
|
||||||
pub fn polish<F>(&self, lookup: F) -> Result<()> where
|
|
||||||
F: FnOnce(Address) -> result::Result<AccountId, &'static str> + Send + Sync
|
|
||||||
{
|
|
||||||
let inner: result::Result<CheckedExtrinsic, Error> = self.original
|
|
||||||
.clone()
|
|
||||||
.check(lookup)
|
|
||||||
.map_err(|e| ErrorKind::BadSignature(e).into());
|
|
||||||
*self.inner.lock() = Some(inner?);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Is this transaction *really* verified?
|
|
||||||
pub fn is_really_verified(&self) -> bool {
|
|
||||||
self.inner.lock().is_some()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Access the underlying transaction.
|
/// Access the underlying transaction.
|
||||||
pub fn as_transaction(&self) -> &UncheckedExtrinsic {
|
pub fn as_transaction(&self) -> &UncheckedExtrinsic {
|
||||||
&self.original
|
&self.original
|
||||||
@@ -129,9 +79,9 @@ impl VerifiedTransaction {
|
|||||||
.expect("UncheckedExtrinsic shares repr with Vec<u8>; qed")
|
.expect("UncheckedExtrinsic shares repr with Vec<u8>; qed")
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Consume the verified transaciton, yielding the unchecked counterpart.
|
/// Consume the verified transaction, yielding the checked counterpart.
|
||||||
pub fn into_inner(self) -> Result<CheckedExtrinsic> {
|
pub fn into_inner(self) -> Option<CheckedExtrinsic> {
|
||||||
self.inner.lock().clone().ok_or_else(|| ErrorKind::NotReady.into())
|
self.inner
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the 256-bit hash of this transaction.
|
/// Get the 256-bit hash of this transaction.
|
||||||
@@ -140,8 +90,8 @@ impl VerifiedTransaction {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Get the account ID of the sender of this transaction.
|
/// Get the account ID of the sender of this transaction.
|
||||||
pub fn sender(&self) -> Result<AccountId> {
|
pub fn sender(&self) -> Option<AccountId> {
|
||||||
self.inner.lock().as_ref().map(|i| i.signed.clone()).ok_or_else(|| ErrorKind::NotReady.into())
|
self.sender
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the account ID of the sender of this transaction.
|
/// Get the account ID of the sender of this transaction.
|
||||||
@@ -153,22 +103,27 @@ impl VerifiedTransaction {
|
|||||||
pub fn encoded_size(&self) -> usize {
|
pub fn encoded_size(&self) -> usize {
|
||||||
self.encoded_size
|
self.encoded_size
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns `true` if the transaction is not yet fully verified.
|
||||||
|
pub fn is_fully_verified(&self) -> bool {
|
||||||
|
self.inner.is_some()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl txpool::VerifiedTransaction for VerifiedTransaction {
|
impl txpool::VerifiedTransaction for VerifiedTransaction {
|
||||||
type Hash = Hash;
|
type Hash = Hash;
|
||||||
type Sender = Address;
|
type Sender = Option<AccountId>;
|
||||||
|
|
||||||
fn hash(&self) -> &Self::Hash {
|
fn hash(&self) -> &Self::Hash {
|
||||||
&self.hash
|
&self.hash
|
||||||
}
|
}
|
||||||
|
|
||||||
fn sender(&self) -> &Self::Sender {
|
fn sender(&self) -> &Self::Sender {
|
||||||
self.original.sender()
|
&self.sender
|
||||||
}
|
}
|
||||||
|
|
||||||
fn mem_usage(&self) -> usize {
|
fn mem_usage(&self) -> usize {
|
||||||
1 // TODO
|
self.encoded_size // TODO
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -184,7 +139,19 @@ impl txpool::Scoring<VerifiedTransaction> for Scoring {
|
|||||||
old.index().cmp(&other.index())
|
old.index().cmp(&other.index())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn choose(&self, _old: &VerifiedTransaction, _new: &VerifiedTransaction) -> Choice {
|
fn choose(&self, old: &VerifiedTransaction, new: &VerifiedTransaction) -> Choice {
|
||||||
|
if old.is_fully_verified() {
|
||||||
|
assert!(new.is_fully_verified(), "Scoring::choose called with transactions from different senders");
|
||||||
|
if old.index() == new.index() {
|
||||||
|
// TODO [ToDr] Do we allow replacement? If yes then it should be Choice::ReplaceOld
|
||||||
|
return Choice::RejectNew;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// This will keep both transactions, even though they have the same indices.
|
||||||
|
// It's fine for not fully verified transactions, we might also allow it for
|
||||||
|
// verified transactions but it would mean that only one of the two is actually valid
|
||||||
|
// (most likely the first to be included in the block).
|
||||||
Choice::InsertNew
|
Choice::InsertNew
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -195,33 +162,37 @@ impl txpool::Scoring<VerifiedTransaction> for Scoring {
|
|||||||
_change: Change<()>
|
_change: Change<()>
|
||||||
) {
|
) {
|
||||||
for i in 0..xts.len() {
|
for i in 0..xts.len() {
|
||||||
// all the same score since there are no fees.
|
if !xts[i].is_fully_verified() {
|
||||||
// TODO: prioritize things like misbehavior or fishermen reports
|
scores[i] = 0;
|
||||||
scores[i] = 1;
|
} else {
|
||||||
|
// all the same score since there are no fees.
|
||||||
|
// TODO: prioritize things like misbehavior or fishermen reports
|
||||||
|
scores[i] = 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
fn should_replace(&self, _old: &VerifiedTransaction, _new: &VerifiedTransaction) -> bool {
|
|
||||||
false // no fees to determine which is better.
|
fn should_replace(&self, old: &VerifiedTransaction, _new: &VerifiedTransaction) -> bool {
|
||||||
|
// Always replace not fully verified transactions.
|
||||||
|
!old.is_fully_verified()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Readiness evaluator for polkadot transactions.
|
/// Readiness evaluator for polkadot transactions.
|
||||||
pub struct Ready<'a, T: 'a + PolkadotApi> {
|
pub struct Ready<'a, A: 'a + PolkadotApi> {
|
||||||
at_block: T::CheckedBlockId,
|
at_block: A::CheckedBlockId,
|
||||||
api: &'a T,
|
api: &'a A,
|
||||||
known_nonces: HashMap<AccountId, (::primitives::Index, bool)>,
|
known_nonces: HashMap<AccountId, ::primitives::Index>,
|
||||||
known_indexes: HashMap<AccountIndex, AccountId>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, T: 'a + PolkadotApi> Ready<'a, T> {
|
impl<'a, A: 'a + PolkadotApi> Ready<'a, A> {
|
||||||
/// Create a new readiness evaluator at the given block. Requires that
|
/// Create a new readiness evaluator at the given block. Requires that
|
||||||
/// the ID has already been checked for local corresponding and available state.
|
/// the ID has already been checked for local corresponding and available state.
|
||||||
pub fn create(at: T::CheckedBlockId, api: &'a T) -> Self {
|
fn create(at: A::CheckedBlockId, api: &'a A) -> Self {
|
||||||
Ready {
|
Ready {
|
||||||
at_block: at,
|
at_block: at,
|
||||||
api,
|
api,
|
||||||
known_nonces: HashMap::new(),
|
known_nonces: HashMap::new(),
|
||||||
known_indexes: HashMap::new(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -232,136 +203,214 @@ impl<'a, T: 'a + PolkadotApi> Clone for Ready<'a, T> {
|
|||||||
at_block: self.at_block.clone(),
|
at_block: self.at_block.clone(),
|
||||||
api: self.api,
|
api: self.api,
|
||||||
known_nonces: self.known_nonces.clone(),
|
known_nonces: self.known_nonces.clone(),
|
||||||
known_indexes: self.known_indexes.clone(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, T: 'a + PolkadotApi> txpool::Ready<VerifiedTransaction> for Ready<'a, T>
|
impl<'a, A: 'a + PolkadotApi> txpool::Ready<VerifiedTransaction> for Ready<'a, A>
|
||||||
{
|
{
|
||||||
fn is_ready(&mut self, xt: &VerifiedTransaction) -> Readiness {
|
fn is_ready(&mut self, xt: &VerifiedTransaction) -> Readiness {
|
||||||
if !xt.is_really_verified() {
|
let sender = match xt.sender() {
|
||||||
let id = match xt.original.extrinsic.signed.clone() {
|
Some(sender) => sender,
|
||||||
RawAddress::Id(id) => id.clone(), // should never happen, since we're not verified.
|
None => return Readiness::Future
|
||||||
RawAddress::Index(i) => match self.known_indexes.entry(i) {
|
};
|
||||||
Entry::Occupied(e) => e.get().clone(),
|
|
||||||
Entry::Vacant(e) => {
|
|
||||||
let (api, at_block) = (&self.api, &self.at_block);
|
|
||||||
if let Some(id) = api.lookup(at_block, RawAddress::Index(i))
|
|
||||||
.ok()
|
|
||||||
.and_then(|o| o)
|
|
||||||
{
|
|
||||||
e.insert(id.clone());
|
|
||||||
id
|
|
||||||
} else {
|
|
||||||
// Invalid index.
|
|
||||||
// return stale in order to get the pool to throw it away.
|
|
||||||
return Readiness::Stale
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
};
|
|
||||||
if VerifiedTransaction::polish(xt, move |_| Ok(id)).is_err() {
|
|
||||||
// Invalid signature.
|
|
||||||
// return stale in order to get the pool to throw it away.
|
|
||||||
return Readiness::Stale
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// guaranteed to be properly verified at this point.
|
|
||||||
|
|
||||||
let sender = xt.sender().expect("only way to get here is `is_really_verified` or successful `polish`; either guarantees `is_really_verified`; `sender` is `Ok` if `is_really_verified`; qed");
|
|
||||||
trace!(target: "transaction-pool", "Checking readiness of {} (from {})", xt.hash, Hash::from(sender));
|
trace!(target: "transaction-pool", "Checking readiness of {} (from {})", xt.hash, Hash::from(sender));
|
||||||
|
|
||||||
let is_index_sender = match xt.original.extrinsic.signed { RawAddress::Index(_) => false, _ => true };
|
|
||||||
|
|
||||||
// TODO: find a way to handle index error properly -- will need changes to
|
// TODO: find a way to handle index error properly -- will need changes to
|
||||||
// transaction-pool trait.
|
// transaction-pool trait.
|
||||||
let (api, at_block) = (&self.api, &self.at_block);
|
let (api, at_block) = (&self.api, &self.at_block);
|
||||||
let get_nonce = || api.index(at_block, sender).ok().unwrap_or_else(Bounded::max_value);
|
let next_index = self.known_nonces.entry(sender)
|
||||||
let (next_nonce, was_index_sender) = self.known_nonces.entry(sender).or_insert_with(|| (get_nonce(), is_index_sender));
|
.or_insert_with(|| api.index(at_block, sender).ok().unwrap_or_else(Bounded::max_value));
|
||||||
|
|
||||||
trace!(target: "transaction-pool", "Next index for sender is {}; xt index is {}", next_nonce, xt.original.extrinsic.index);
|
trace!(target: "transaction-pool", "Next index for sender is {}; xt index is {}", next_index, xt.original.extrinsic.index);
|
||||||
|
|
||||||
if *was_index_sender == is_index_sender || get_nonce() == *next_nonce {
|
let result = match xt.original.extrinsic.index.cmp(&next_index) {
|
||||||
match xt.original.extrinsic.index.cmp(&next_nonce) {
|
// TODO: this won't work perfectly since accounts can now be killed, returning the nonce
|
||||||
Ordering::Greater => Readiness::Future,
|
// to zero.
|
||||||
Ordering::Less => Readiness::Stale,
|
// We should detect if the index was reset and mark all transactions as `Stale` for cull to work correctly.
|
||||||
Ordering::Equal => {
|
// Otherwise those transactions will keep occupying the queue.
|
||||||
// remember to increment `next_nonce`
|
// Perhaps we could mark as stale if `index - state_index` > X?
|
||||||
// TODO: this won't work perfectly since accounts can now be killed, returning the nonce
|
Ordering::Greater => Readiness::Future,
|
||||||
// to zero.
|
Ordering::Equal => Readiness::Ready,
|
||||||
*next_nonce = next_nonce.saturating_add(1);
|
// TODO [ToDr] Should mark transactions referrencing too old blockhash as `Stale` as well.
|
||||||
Readiness::Ready
|
Ordering::Less => Readiness::Stale,
|
||||||
}
|
};
|
||||||
}
|
|
||||||
} else {
|
// remember to increment `next_index`
|
||||||
// ignore for now.
|
*next_index = next_index.saturating_add(1);
|
||||||
Readiness::Future
|
|
||||||
|
result
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Verifier<'a, A: 'a, B> {
|
||||||
|
api: &'a A,
|
||||||
|
at_block: B,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, A> Verifier<'a, A, A::CheckedBlockId> where
|
||||||
|
A: 'a + PolkadotApi,
|
||||||
|
{
|
||||||
|
const NO_ACCOUNT: &'static str = "Account not found.";
|
||||||
|
|
||||||
|
fn lookup(&self, address: Address) -> ::std::result::Result<AccountId, &'static str> {
|
||||||
|
// TODO [ToDr] Consider introducing a cache for this.
|
||||||
|
match self.api.lookup(&self.at_block, address.clone()) {
|
||||||
|
Ok(Some(address)) => Ok(address),
|
||||||
|
Ok(None) => Err(Self::NO_ACCOUNT.into()),
|
||||||
|
Err(e) => {
|
||||||
|
error!("Error looking up address: {:?}: {:?}", address, e);
|
||||||
|
Err("API error.")
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Verifier;
|
impl<'a, A> txpool::Verifier<UncheckedExtrinsic> for Verifier<'a, A, A::CheckedBlockId> where
|
||||||
|
A: 'a + PolkadotApi,
|
||||||
impl txpool::Verifier<UncheckedExtrinsic> for Verifier {
|
{
|
||||||
type VerifiedTransaction = VerifiedTransaction;
|
type VerifiedTransaction = VerifiedTransaction;
|
||||||
type Error = Error;
|
type Error = Error;
|
||||||
|
|
||||||
fn verify_transaction(&self, uxt: UncheckedExtrinsic) -> Result<Self::VerifiedTransaction> {
|
fn verify_transaction(&self, uxt: UncheckedExtrinsic) -> Result<Self::VerifiedTransaction> {
|
||||||
info!("Extrinsic Submitted: {:?}", uxt);
|
info!("Extrinsic Submitted: {:?}", uxt);
|
||||||
VerifiedTransaction::create(uxt)
|
|
||||||
|
if !uxt.is_signed() {
|
||||||
|
bail!(ErrorKind::IsInherent(uxt))
|
||||||
|
}
|
||||||
|
|
||||||
|
let (encoded_size, hash) = uxt.using_encoded(|e| (e.len(), BlakeTwo256::hash(e)));
|
||||||
|
let inner = match uxt.clone().check(|a| self.lookup(a)) {
|
||||||
|
Ok(xt) => Some(xt),
|
||||||
|
// keep the transaction around in the future pool and attempt to promote it later.
|
||||||
|
Err(Self::NO_ACCOUNT) => None,
|
||||||
|
Err(e) => bail!(e),
|
||||||
|
};
|
||||||
|
let sender = inner.as_ref().map(|x| x.signed.clone());
|
||||||
|
|
||||||
|
Ok(VerifiedTransaction {
|
||||||
|
original: uxt,
|
||||||
|
inner,
|
||||||
|
sender,
|
||||||
|
hash,
|
||||||
|
encoded_size
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The polkadot transaction pool.
|
/// The polkadot transaction pool.
|
||||||
///
|
///
|
||||||
/// Wraps a `extrinsic_pool::Pool`.
|
/// Wraps a `extrinsic_pool::Pool`.
|
||||||
pub struct TransactionPool {
|
pub struct TransactionPool<A> {
|
||||||
inner: Pool<UncheckedExtrinsic, Hash, Verifier, Scoring, Error>,
|
inner: Pool<Hash, VerifiedTransaction, Scoring, Error>,
|
||||||
|
api: Arc<A>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TransactionPool {
|
impl<A> TransactionPool<A> where
|
||||||
|
A: PolkadotApi,
|
||||||
|
{
|
||||||
/// Create a new transaction pool.
|
/// Create a new transaction pool.
|
||||||
pub fn new(options: Options) -> Self {
|
pub fn new(options: Options, api: Arc<A>) -> Self {
|
||||||
TransactionPool {
|
TransactionPool {
|
||||||
inner: Pool::new(options, Verifier, Scoring),
|
inner: Pool::new(options, Scoring),
|
||||||
|
api,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: remove. This is pointless - just use `submit()` directly.
|
/// Attempt to directly import `UncheckedExtrinsic` without going through serialization.
|
||||||
pub fn import_unchecked_extrinsic(&self, uxt: UncheckedExtrinsic) -> Result<Arc<VerifiedTransaction>> {
|
pub fn import_unchecked_extrinsic(&self, block: BlockId, uxt: UncheckedExtrinsic) -> Result<Arc<VerifiedTransaction>> {
|
||||||
self.inner.submit(vec![uxt]).map(|mut v| v.swap_remove(0))
|
let verifier = Verifier {
|
||||||
|
api: &*self.api,
|
||||||
|
at_block: self.api.check_id(block)?,
|
||||||
|
};
|
||||||
|
self.inner.submit(verifier, vec![uxt]).map(|mut v| v.swap_remove(0))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Retry to import all semi-verified transactions (unknown account indices)
|
||||||
|
pub fn retry_verification(&self, block: BlockId) -> Result<()> {
|
||||||
|
let to_reverify = self.inner.remove_sender(None);
|
||||||
|
let verifier = Verifier {
|
||||||
|
api: &*self.api,
|
||||||
|
at_block: self.api.check_id(block)?,
|
||||||
|
};
|
||||||
|
|
||||||
|
self.inner.submit(verifier, to_reverify.into_iter().map(|tx| tx.original.clone()))?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Reverify transaction that has been reported incorrect.
|
||||||
|
///
|
||||||
|
/// Returns `Ok(None)` in case the hash is missing, `Err(e)` in case of verification error and new transaction
|
||||||
|
/// reference otherwise.
|
||||||
|
///
|
||||||
|
/// TODO [ToDr] That method is currently unused, should be used together with BlockBuilder
|
||||||
|
/// when we detect that particular transaction has failed.
|
||||||
|
/// In such case we will attempt to remove or re-verify it.
|
||||||
|
pub fn reverify_transaction(&self, block: BlockId, hash: Hash) -> Result<Option<Arc<VerifiedTransaction>>> {
|
||||||
|
let result = self.inner.remove(&[hash], false).pop().expect("One hash passed; one result received; qed");
|
||||||
|
if let Some(tx) = result {
|
||||||
|
self.import_unchecked_extrinsic(block, tx.original.clone()).map(Some)
|
||||||
|
} else {
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Cull old transactions from the queue.
|
||||||
|
pub fn cull(&self, block: BlockId) -> Result<usize> {
|
||||||
|
let id = self.api.check_id(block)?;
|
||||||
|
let ready = Ready::create(id, &*self.api);
|
||||||
|
Ok(self.inner.cull(None, ready))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Cull transactions from the queue and then compute the pending set.
|
||||||
|
pub fn cull_and_get_pending<F, T>(&self, block: BlockId, f: F) -> Result<T> where
|
||||||
|
F: FnOnce(txpool::PendingIterator<VerifiedTransaction, Ready<A>, Scoring, Listener<Hash>>) -> T,
|
||||||
|
{
|
||||||
|
let id = self.api.check_id(block)?;
|
||||||
|
let ready = Ready::create(id, &*self.api);
|
||||||
|
self.inner.cull(None, ready.clone());
|
||||||
|
Ok(self.inner.pending(ready, f))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Remove a set of transactions idenitified by hashes.
|
||||||
|
pub fn remove(&self, hashes: &[Hash], is_valid: bool) -> Vec<Option<Arc<VerifiedTransaction>>> {
|
||||||
|
self.inner.remove(hashes, is_valid)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Deref for TransactionPool {
|
impl<A> Deref for TransactionPool<A> {
|
||||||
type Target = Pool<UncheckedExtrinsic, Hash, Verifier, Scoring, Error>;
|
type Target = Pool<Hash, VerifiedTransaction, Scoring, Error>;
|
||||||
|
|
||||||
fn deref(&self) -> &Self::Target {
|
fn deref(&self) -> &Self::Target {
|
||||||
&self.inner
|
&self.inner
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ExtrinsicPool<FutureProofUncheckedExtrinsic, Hash> for TransactionPool {
|
impl<A> ExtrinsicPool<FutureProofUncheckedExtrinsic, BlockId, Hash> for TransactionPool<A> where
|
||||||
|
A: Send + Sync + 'static,
|
||||||
|
A: PolkadotApi,
|
||||||
|
{
|
||||||
type Error = Error;
|
type Error = Error;
|
||||||
|
|
||||||
fn submit(&self, xts: Vec<FutureProofUncheckedExtrinsic>) -> Result<Vec<Hash>> {
|
fn submit(&self, block: BlockId, xts: Vec<FutureProofUncheckedExtrinsic>) -> Result<Vec<Hash>> {
|
||||||
// TODO: more general transaction pool, which can handle more kinds of vec-encoded transactions,
|
// TODO: more general transaction pool, which can handle more kinds of vec-encoded transactions,
|
||||||
// even when runtime is out of date.
|
// even when runtime is out of date.
|
||||||
xts.into_iter()
|
xts.into_iter()
|
||||||
.map(|xt| xt.encode())
|
.map(|xt| xt.encode())
|
||||||
.map(|encoded| UncheckedExtrinsic::decode(&mut &encoded[..]))
|
.map(|encoded| {
|
||||||
.map(|maybe_decoded| maybe_decoded.ok_or_else(|| ErrorKind::InvalidExtrinsicFormat.into()))
|
let decoded = UncheckedExtrinsic::decode(&mut &encoded[..]).ok_or(ErrorKind::InvalidExtrinsicFormat)?;
|
||||||
.map(|x| x.and_then(|x| self.import_unchecked_extrinsic(x)))
|
let tx = self.import_unchecked_extrinsic(block, decoded)?;
|
||||||
.map(|x| x.map(|x| x.hash().clone()))
|
Ok(*tx.hash())
|
||||||
|
})
|
||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::{TransactionPool, Ready};
|
use std::sync::{atomic::{self, AtomicBool}, Arc};
|
||||||
|
use super::TransactionPool;
|
||||||
use substrate_keyring::Keyring::{self, *};
|
use substrate_keyring::Keyring::{self, *};
|
||||||
use codec::Slicable;
|
use codec::Slicable;
|
||||||
use polkadot_api::{PolkadotApi, BlockBuilder, CheckedBlockId, Result};
|
use polkadot_api::{PolkadotApi, BlockBuilder, CheckedBlockId, Result};
|
||||||
@@ -390,8 +439,23 @@ mod tests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Default, Clone)]
|
||||||
struct TestPolkadotApi;
|
struct TestPolkadotApi {
|
||||||
|
no_lookup: Arc<AtomicBool>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TestPolkadotApi {
|
||||||
|
fn without_lookup() -> Self {
|
||||||
|
TestPolkadotApi {
|
||||||
|
no_lookup: Arc::new(AtomicBool::new(true)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn enable_lookup(&self) {
|
||||||
|
self.no_lookup.store(false, atomic::Ordering::SeqCst);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl PolkadotApi for TestPolkadotApi {
|
impl PolkadotApi for TestPolkadotApi {
|
||||||
type CheckedBlockId = TestCheckedBlockId;
|
type CheckedBlockId = TestCheckedBlockId;
|
||||||
type BlockBuilder = TestBlockBuilder;
|
type BlockBuilder = TestBlockBuilder;
|
||||||
@@ -415,6 +479,7 @@ mod tests {
|
|||||||
fn lookup(&self, _at: &TestCheckedBlockId, _address: RawAddress<AccountId, AccountIndex>) -> Result<Option<AccountId>> {
|
fn lookup(&self, _at: &TestCheckedBlockId, _address: RawAddress<AccountId, AccountIndex>) -> Result<Option<AccountId>> {
|
||||||
match _address {
|
match _address {
|
||||||
RawAddress::Id(i) => Ok(Some(i)),
|
RawAddress::Id(i) => Ok(Some(i)),
|
||||||
|
RawAddress::Index(_) if self.no_lookup.load(atomic::Ordering::SeqCst) => Ok(None),
|
||||||
RawAddress::Index(i) => Ok(match (i < 8, i + (number_of(_at) as u64) % 8) {
|
RawAddress::Index(i) => Ok(match (i < 8, i + (number_of(_at) as u64) % 8) {
|
||||||
(false, _) => None,
|
(false, _) => None,
|
||||||
(_, 0) => Some(Alice.to_raw_public().into()),
|
(_, 0) => Some(Alice.to_raw_public().into()),
|
||||||
@@ -456,130 +521,168 @@ mod tests {
|
|||||||
}, MaybeUnsigned(sig.into())).using_encoded(|e| UncheckedExtrinsic::decode(&mut &e[..])).unwrap()
|
}, MaybeUnsigned(sig.into())).using_encoded(|e| UncheckedExtrinsic::decode(&mut &e[..])).unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn pool(api: &TestPolkadotApi) -> TransactionPool<TestPolkadotApi> {
|
||||||
|
TransactionPool::new(Default::default(), Arc::new(api.clone()))
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn id_submission_should_work() {
|
fn id_submission_should_work() {
|
||||||
let pool = TransactionPool::new(Default::default());
|
let api = TestPolkadotApi::default();
|
||||||
pool.submit(vec![uxt(Alice, 209, true)]).unwrap();
|
let pool = pool(&api);
|
||||||
|
pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 209, true)).unwrap();
|
||||||
|
|
||||||
let ready = Ready::create(TestPolkadotApi.check_id(BlockId::number(0)).unwrap(), &TestPolkadotApi);
|
let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap();
|
||||||
let pending: Vec<_> = pool.cull_and_get_pending(ready, |p| p.map(|a| (a.sender().ok(), a.index())).collect());
|
|
||||||
assert_eq!(pending, vec![(Some(Alice.to_raw_public().into()), 209)]);
|
assert_eq!(pending, vec![(Some(Alice.to_raw_public().into()), 209)]);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn index_submission_should_work() {
|
fn index_submission_should_work() {
|
||||||
let pool = TransactionPool::new(Default::default());
|
let api = TestPolkadotApi::default();
|
||||||
pool.submit(vec![uxt(Alice, 209, false)]).unwrap();
|
let pool = pool(&api);
|
||||||
|
pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 209, false)).unwrap();
|
||||||
|
|
||||||
let ready = Ready::create(TestPolkadotApi.check_id(BlockId::number(0)).unwrap(), &TestPolkadotApi);
|
let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap();
|
||||||
let pending: Vec<_> = pool.cull_and_get_pending(ready, |p| p.map(|a| (a.sender().ok(), a.index())).collect());
|
|
||||||
assert_eq!(pending, vec![(Some(Alice.to_raw_public().into()), 209)]);
|
assert_eq!(pending, vec![(Some(Alice.to_raw_public().into()), 209)]);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn multiple_id_submission_should_work() {
|
fn multiple_id_submission_should_work() {
|
||||||
let pool = TransactionPool::new(Default::default());
|
let api = TestPolkadotApi::default();
|
||||||
pool.submit(vec![uxt(Alice, 209, true)]).unwrap();
|
let pool = pool(&api);
|
||||||
pool.submit(vec![uxt(Alice, 210, true)]).unwrap();
|
pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 209, true)).unwrap();
|
||||||
|
pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 210, true)).unwrap();
|
||||||
|
|
||||||
let ready = Ready::create(TestPolkadotApi.check_id(BlockId::number(0)).unwrap(), &TestPolkadotApi);
|
let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap();
|
||||||
let pending: Vec<_> = pool.cull_and_get_pending(ready, |p| p.map(|a| (a.sender().ok(), a.index())).collect());
|
|
||||||
assert_eq!(pending, vec![(Some(Alice.to_raw_public().into()), 209), (Some(Alice.to_raw_public().into()), 210)]);
|
assert_eq!(pending, vec![(Some(Alice.to_raw_public().into()), 209), (Some(Alice.to_raw_public().into()), 210)]);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn multiple_index_submission_should_work() {
|
fn multiple_index_submission_should_work() {
|
||||||
let pool = TransactionPool::new(Default::default());
|
let api = TestPolkadotApi::default();
|
||||||
pool.submit(vec![uxt(Alice, 209, false)]).unwrap();
|
let pool = pool(&api);
|
||||||
pool.submit(vec![uxt(Alice, 210, false)]).unwrap();
|
pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 209, false)).unwrap();
|
||||||
|
pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 210, false)).unwrap();
|
||||||
|
|
||||||
let ready = Ready::create(TestPolkadotApi.check_id(BlockId::number(0)).unwrap(), &TestPolkadotApi);
|
let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap();
|
||||||
let pending: Vec<_> = pool.cull_and_get_pending(ready, |p| p.map(|a| (a.sender().ok(), a.index())).collect());
|
|
||||||
assert_eq!(pending, vec![(Some(Alice.to_raw_public().into()), 209), (Some(Alice.to_raw_public().into()), 210)]);
|
assert_eq!(pending, vec![(Some(Alice.to_raw_public().into()), 209), (Some(Alice.to_raw_public().into()), 210)]);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn id_based_early_nonce_should_be_culled() {
|
fn id_based_early_nonce_should_be_culled() {
|
||||||
let pool = TransactionPool::new(Default::default());
|
let api = TestPolkadotApi::default();
|
||||||
pool.submit(vec![uxt(Alice, 208, true)]).unwrap();
|
let pool = pool(&api);
|
||||||
|
pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 208, true)).unwrap();
|
||||||
|
|
||||||
let ready = Ready::create(TestPolkadotApi.check_id(BlockId::number(0)).unwrap(), &TestPolkadotApi);
|
let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap();
|
||||||
let pending: Vec<_> = pool.cull_and_get_pending(ready, |p| p.map(|a| (a.sender().ok(), a.index())).collect());
|
|
||||||
assert_eq!(pending, vec![]);
|
assert_eq!(pending, vec![]);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn index_based_early_nonce_should_be_culled() {
|
fn index_based_early_nonce_should_be_culled() {
|
||||||
let pool = TransactionPool::new(Default::default());
|
let api = TestPolkadotApi::default();
|
||||||
pool.submit(vec![uxt(Alice, 208, false)]).unwrap();
|
let pool = pool(&api);
|
||||||
|
pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 208, false)).unwrap();
|
||||||
|
|
||||||
let ready = Ready::create(TestPolkadotApi.check_id(BlockId::number(0)).unwrap(), &TestPolkadotApi);
|
let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap();
|
||||||
let pending: Vec<_> = pool.cull_and_get_pending(ready, |p| p.map(|a| (a.sender().ok(), a.index())).collect());
|
|
||||||
assert_eq!(pending, vec![]);
|
assert_eq!(pending, vec![]);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn id_based_late_nonce_should_be_queued() {
|
fn id_based_late_nonce_should_be_queued() {
|
||||||
let pool = TransactionPool::new(Default::default());
|
let api = TestPolkadotApi::default();
|
||||||
let ready = || Ready::create(TestPolkadotApi.check_id(BlockId::number(0)).unwrap(), &TestPolkadotApi);
|
let pool = pool(&api);
|
||||||
|
|
||||||
pool.submit(vec![uxt(Alice, 210, true)]).unwrap();
|
pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 210, true)).unwrap();
|
||||||
let pending: Vec<_> = pool.cull_and_get_pending(ready(), |p| p.map(|a| (a.sender().ok(), a.index())).collect());
|
let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap();
|
||||||
assert_eq!(pending, vec![]);
|
assert_eq!(pending, vec![]);
|
||||||
|
|
||||||
pool.submit(vec![uxt(Alice, 209, true)]).unwrap();
|
pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 209, true)).unwrap();
|
||||||
let pending: Vec<_> = pool.cull_and_get_pending(ready(), |p| p.map(|a| (a.sender().ok(), a.index())).collect());
|
let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap();
|
||||||
assert_eq!(pending, vec![(Some(Alice.to_raw_public().into()), 209), (Some(Alice.to_raw_public().into()), 210)]);
|
assert_eq!(pending, vec![(Some(Alice.to_raw_public().into()), 209), (Some(Alice.to_raw_public().into()), 210)]);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn index_based_late_nonce_should_be_queued() {
|
fn index_based_late_nonce_should_be_queued() {
|
||||||
let pool = TransactionPool::new(Default::default());
|
let api = TestPolkadotApi::default();
|
||||||
let ready = || Ready::create(TestPolkadotApi.check_id(BlockId::number(0)).unwrap(), &TestPolkadotApi);
|
let pool = pool(&api);
|
||||||
|
|
||||||
pool.submit(vec![uxt(Alice, 210, false)]).unwrap();
|
pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 210, false)).unwrap();
|
||||||
let pending: Vec<_> = pool.cull_and_get_pending(ready(), |p| p.map(|a| (a.sender().ok(), a.index())).collect());
|
let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap();
|
||||||
assert_eq!(pending, vec![]);
|
assert_eq!(pending, vec![]);
|
||||||
|
|
||||||
pool.submit(vec![uxt(Alice, 209, false)]).unwrap();
|
pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 209, false)).unwrap();
|
||||||
let pending: Vec<_> = pool.cull_and_get_pending(ready(), |p| p.map(|a| (a.sender().ok(), a.index())).collect());
|
let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap();
|
||||||
assert_eq!(pending, vec![(Some(Alice.to_raw_public().into()), 209), (Some(Alice.to_raw_public().into()), 210)]);
|
assert_eq!(pending, vec![(Some(Alice.to_raw_public().into()), 209), (Some(Alice.to_raw_public().into()), 210)]);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn index_then_id_submission_should_make_progress() {
|
fn index_then_id_submission_should_make_progress() {
|
||||||
let pool = TransactionPool::new(Default::default());
|
let api = TestPolkadotApi::without_lookup();
|
||||||
pool.submit(vec![uxt(Alice, 209, false)]).unwrap();
|
let pool = pool(&api);
|
||||||
pool.submit(vec![uxt(Alice, 210, true)]).unwrap();
|
pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 209, false)).unwrap();
|
||||||
|
pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 210, true)).unwrap();
|
||||||
|
|
||||||
let ready = Ready::create(TestPolkadotApi.check_id(BlockId::number(0)).unwrap(), &TestPolkadotApi);
|
let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap();
|
||||||
let pending: Vec<_> = pool.cull_and_get_pending(ready, |p| p.map(|a| (a.sender().ok(), a.index())).collect());
|
assert_eq!(pending, vec![]);
|
||||||
|
|
||||||
|
api.enable_lookup();
|
||||||
|
pool.retry_verification(BlockId::number(0)).unwrap();
|
||||||
|
|
||||||
|
let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap();
|
||||||
assert_eq!(pending, vec![
|
assert_eq!(pending, vec![
|
||||||
(Some(Alice.to_raw_public().into()), 209)
|
(Some(Alice.to_raw_public().into()), 209),
|
||||||
|
(Some(Alice.to_raw_public().into()), 210)
|
||||||
]);
|
]);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn id_then_index_submission_should_make_progress() {
|
fn retrying_verification_might_not_change_anything() {
|
||||||
let pool = TransactionPool::new(Default::default());
|
let api = TestPolkadotApi::without_lookup();
|
||||||
pool.submit(vec![uxt(Alice, 209, true)]).unwrap();
|
let pool = pool(&api);
|
||||||
pool.submit(vec![uxt(Alice, 210, false)]).unwrap();
|
pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 209, false)).unwrap();
|
||||||
|
pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 210, true)).unwrap();
|
||||||
|
|
||||||
let ready = Ready::create(TestPolkadotApi.check_id(BlockId::number(0)).unwrap(), &TestPolkadotApi);
|
let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap();
|
||||||
let pending: Vec<_> = pool.cull_and_get_pending(ready, |p| p.map(|a| (a.sender().ok(), a.index())).collect());
|
assert_eq!(pending, vec![]);
|
||||||
|
|
||||||
|
pool.retry_verification(BlockId::number(1)).unwrap();
|
||||||
|
|
||||||
|
let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap();
|
||||||
|
assert_eq!(pending, vec![]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn id_then_index_submission_should_make_progress() {
|
||||||
|
let api = TestPolkadotApi::without_lookup();
|
||||||
|
let pool = pool(&api);
|
||||||
|
pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 209, true)).unwrap();
|
||||||
|
pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 210, false)).unwrap();
|
||||||
|
|
||||||
|
let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap();
|
||||||
assert_eq!(pending, vec![
|
assert_eq!(pending, vec![
|
||||||
(Some(Alice.to_raw_public().into()), 209)
|
(Some(Alice.to_raw_public().into()), 209)
|
||||||
]);
|
]);
|
||||||
|
|
||||||
|
// when
|
||||||
|
api.enable_lookup();
|
||||||
|
pool.retry_verification(BlockId::number(0)).unwrap();
|
||||||
|
|
||||||
|
let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap();
|
||||||
|
assert_eq!(pending, vec![
|
||||||
|
(Some(Alice.to_raw_public().into()), 209),
|
||||||
|
(Some(Alice.to_raw_public().into()), 210)
|
||||||
|
]);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn index_change_should_result_in_second_tx_culled_or_future() {
|
fn index_change_should_result_in_second_tx_culled_or_future() {
|
||||||
let pool = TransactionPool::new(Default::default());
|
let api = TestPolkadotApi::default();
|
||||||
pool.submit(vec![uxt(Alice, 209, false)]).unwrap();
|
let pool = pool(&api);
|
||||||
pool.submit(vec![uxt(Alice, 210, false)]).unwrap();
|
let block = BlockId::number(0);
|
||||||
|
pool.import_unchecked_extrinsic(block, uxt(Alice, 209, false)).unwrap();
|
||||||
|
let hash = *pool.import_unchecked_extrinsic(block, uxt(Alice, 210, false)).unwrap().hash();
|
||||||
|
|
||||||
let ready = Ready::create(TestPolkadotApi.check_id(BlockId::number(0)).unwrap(), &TestPolkadotApi);
|
let pending: Vec<_> = pool.cull_and_get_pending(block, |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap();
|
||||||
let pending: Vec<_> = pool.cull_and_get_pending(ready, |p| p.map(|a| (a.sender().ok(), a.index())).collect());
|
|
||||||
assert_eq!(pending, vec![
|
assert_eq!(pending, vec![
|
||||||
(Some(Alice.to_raw_public().into()), 209),
|
(Some(Alice.to_raw_public().into()), 209),
|
||||||
(Some(Alice.to_raw_public().into()), 210)
|
(Some(Alice.to_raw_public().into()), 210)
|
||||||
@@ -593,11 +696,14 @@ mod tests {
|
|||||||
|
|
||||||
// after this, a re-evaluation of the second's readiness should result in it being thrown
|
// after this, a re-evaluation of the second's readiness should result in it being thrown
|
||||||
// out (or maybe placed in future queue).
|
// out (or maybe placed in future queue).
|
||||||
/*
|
let err = pool.reverify_transaction(BlockId::number(1), hash).unwrap_err();
|
||||||
// TODO: uncomment once the new queue design is in.
|
match *err.kind() {
|
||||||
let ready = Ready::create(TestPolkadotApi.check_id(BlockId::number(1)).unwrap(), &TestPolkadotApi);
|
::error::ErrorKind::Msg(ref m) if m == "bad signature in extrinsic" => {},
|
||||||
let pending: Vec<_> = pool.cull_and_get_pending(ready, |p| p.map(|a| (a.sender().ok(), a.index())).collect());
|
ref e => assert!(false, "The transaction should be rejected with BadSignature error, got: {:?}", e),
|
||||||
|
}
|
||||||
|
|
||||||
|
let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(1), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap();
|
||||||
assert_eq!(pending, vec![]);
|
assert_eq!(pending, vec![]);
|
||||||
*/
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -155,7 +155,7 @@ pub fn new_in_mem<E, Block, S>(
|
|||||||
Client::new(backend, executor, genesis_storage)
|
Client::new(backend, executor, genesis_storage)
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<B, E, Block: BlockT> Client<B, E, Block> where
|
impl<B, E, Block> Client<B, E, Block> where
|
||||||
B: backend::Backend<Block>,
|
B: backend::Backend<Block>,
|
||||||
E: CallExecutor<Block>,
|
E: CallExecutor<Block>,
|
||||||
Block: BlockT,
|
Block: BlockT,
|
||||||
|
|||||||
@@ -16,9 +16,7 @@
|
|||||||
|
|
||||||
//! External API for extrinsic pool.
|
//! External API for extrinsic pool.
|
||||||
|
|
||||||
use std::fmt;
|
use txpool;
|
||||||
use std::ops::Deref;
|
|
||||||
use txpool::{self, VerifiedTransaction};
|
|
||||||
|
|
||||||
/// Extrinsic pool error.
|
/// Extrinsic pool error.
|
||||||
pub trait Error: ::std::error::Error + Send + Sized {
|
pub trait Error: ::std::error::Error + Send + Sized {
|
||||||
@@ -35,28 +33,10 @@ impl Error for txpool::Error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Extrinsic pool.
|
/// Extrinsic pool.
|
||||||
pub trait ExtrinsicPool<Ex, Hash>: Send + Sync + 'static {
|
pub trait ExtrinsicPool<Ex, BlockId, Hash>: Send + Sync + 'static {
|
||||||
/// Error type
|
/// Error type
|
||||||
type Error: Error;
|
type Error: Error;
|
||||||
|
|
||||||
/// Submit a collection of extrinsics to the pool.
|
/// Submit a collection of extrinsics to the pool.
|
||||||
fn submit(&self, xt: Vec<Ex>) -> Result<Vec<Hash>, Self::Error>;
|
fn submit(&self, block: BlockId, xt: Vec<Ex>) -> Result<Vec<Hash>, Self::Error>;
|
||||||
}
|
|
||||||
|
|
||||||
// Blanket implementation for anything that `Derefs` to the pool.
|
|
||||||
impl<Ex, Hash, V, S, E, T> ExtrinsicPool<Ex, Hash> for T where
|
|
||||||
Hash: ::std::hash::Hash + Eq + Copy + fmt::Debug + fmt::LowerHex + Default,
|
|
||||||
T: Deref<Target=super::Pool<Ex, Hash, V, S, E>> + Send + Sync + 'static,
|
|
||||||
V: txpool::Verifier<Ex>,
|
|
||||||
S: txpool::Scoring<V::VerifiedTransaction>,
|
|
||||||
V::VerifiedTransaction: txpool::VerifiedTransaction<Hash=Hash>,
|
|
||||||
E: From<V::Error>,
|
|
||||||
E: From<txpool::Error>,
|
|
||||||
E: Error,
|
|
||||||
{
|
|
||||||
type Error = E;
|
|
||||||
|
|
||||||
fn submit(&self, xt: Vec<Ex>) -> Result<Vec<Hash>, Self::Error> {
|
|
||||||
self.deref().submit(xt).map(|result| result.into_iter().map(|xt| *xt.hash()).collect())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -32,5 +32,6 @@ mod listener;
|
|||||||
mod pool;
|
mod pool;
|
||||||
mod watcher;
|
mod watcher;
|
||||||
|
|
||||||
|
pub use self::listener::Listener;
|
||||||
pub use self::pool::Pool;
|
pub use self::pool::Pool;
|
||||||
pub use self::watcher::Watcher;
|
pub use self::watcher::Watcher;
|
||||||
|
|||||||
@@ -23,17 +23,22 @@ use txpool;
|
|||||||
|
|
||||||
use watcher;
|
use watcher;
|
||||||
|
|
||||||
|
/// Extrinsic pool default listener.
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
pub struct Listener<H: ::std::hash::Hash + Eq> {
|
pub struct Listener<H: ::std::hash::Hash + Eq> {
|
||||||
watchers: HashMap<H, watcher::Sender<H>>
|
watchers: HashMap<H, watcher::Sender<H>>
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<H: ::std::hash::Hash + Eq + Copy + fmt::Debug + fmt::LowerHex + Default> Listener<H> {
|
impl<H: ::std::hash::Hash + Eq + Copy + fmt::Debug + fmt::LowerHex + Default> Listener<H> {
|
||||||
|
/// Creates a new watcher for given verified extrinsic.
|
||||||
|
///
|
||||||
|
/// The watcher can be used to subscribe to lifecycle events of that extrinsic.
|
||||||
pub fn create_watcher<T: txpool::VerifiedTransaction<Hash=H>>(&mut self, xt: Arc<T>) -> watcher::Watcher<H> {
|
pub fn create_watcher<T: txpool::VerifiedTransaction<Hash=H>>(&mut self, xt: Arc<T>) -> watcher::Watcher<H> {
|
||||||
let sender = self.watchers.entry(*xt.hash()).or_insert_with(watcher::Sender::default);
|
let sender = self.watchers.entry(*xt.hash()).or_insert_with(watcher::Sender::default);
|
||||||
sender.new_watcher()
|
sender.new_watcher()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Notify the listeners about extrinsic broadcast.
|
||||||
pub fn broadcasted(&mut self, hash: &H, peers: Vec<String>) {
|
pub fn broadcasted(&mut self, hash: &H, peers: Vec<String>) {
|
||||||
self.fire(hash, |watcher| watcher.broadcast(peers));
|
self.fire(hash, |watcher| watcher.broadcast(peers));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -29,41 +29,37 @@ use listener::Listener;
|
|||||||
use watcher::Watcher;
|
use watcher::Watcher;
|
||||||
|
|
||||||
/// Extrinsics pool.
|
/// Extrinsics pool.
|
||||||
pub struct Pool<Ex, Hash, V, S, E> where
|
pub struct Pool<Hash, VEx, S, E> where
|
||||||
Hash: ::std::hash::Hash + Eq + Copy + fmt::Debug + fmt::LowerHex,
|
Hash: ::std::hash::Hash + Eq + Copy + fmt::Debug + fmt::LowerHex,
|
||||||
V: txpool::Verifier<Ex>,
|
S: txpool::Scoring<VEx>,
|
||||||
S: txpool::Scoring<V::VerifiedTransaction>,
|
VEx: txpool::VerifiedTransaction<Hash=Hash>,
|
||||||
{
|
{
|
||||||
_error: Mutex<PhantomData<E>>,
|
_error: Mutex<PhantomData<E>>,
|
||||||
pool: RwLock<txpool::Pool<
|
pool: RwLock<txpool::Pool<
|
||||||
V::VerifiedTransaction,
|
VEx,
|
||||||
S,
|
S,
|
||||||
Listener<Hash>,
|
Listener<Hash>,
|
||||||
>>,
|
>>,
|
||||||
verifier: V,
|
import_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<Weak<VEx>>>>,
|
||||||
import_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<Weak<V::VerifiedTransaction>>>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Ex, Hash, V, S, E> Pool<Ex, Hash, V, S, E> where
|
impl<Hash, VEx, S, E> Pool<Hash, VEx, S, E> where
|
||||||
Hash: ::std::hash::Hash + Eq + Copy + fmt::Debug + fmt::LowerHex + Default,
|
Hash: ::std::hash::Hash + Eq + Copy + fmt::Debug + fmt::LowerHex + Default,
|
||||||
V: txpool::Verifier<Ex>,
|
S: txpool::Scoring<VEx>,
|
||||||
S: txpool::Scoring<V::VerifiedTransaction>,
|
VEx: txpool::VerifiedTransaction<Hash=Hash>,
|
||||||
V::VerifiedTransaction: txpool::VerifiedTransaction<Hash=Hash>,
|
|
||||||
E: From<V::Error>,
|
|
||||||
E: From<txpool::Error>,
|
E: From<txpool::Error>,
|
||||||
{
|
{
|
||||||
/// Create a new transaction pool.
|
/// Create a new transaction pool.
|
||||||
pub fn new(options: txpool::Options, verifier: V, scoring: S) -> Self {
|
pub fn new(options: txpool::Options, scoring: S) -> Self {
|
||||||
Pool {
|
Pool {
|
||||||
_error: Default::default(),
|
_error: Default::default(),
|
||||||
pool: RwLock::new(txpool::Pool::new(Listener::default(), scoring, options)),
|
pool: RwLock::new(txpool::Pool::new(Listener::default(), scoring, options)),
|
||||||
verifier,
|
|
||||||
import_notification_sinks: Default::default(),
|
import_notification_sinks: Default::default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Imports a pre-verified extrinsic to the pool.
|
/// Imports a pre-verified extrinsic to the pool.
|
||||||
pub fn import(&self, xt: V::VerifiedTransaction) -> Result<Arc<V::VerifiedTransaction>, E> {
|
pub fn import(&self, xt: VEx) -> Result<Arc<VEx>, E> {
|
||||||
let result = self.pool.write().import(xt)?;
|
let result = self.pool.write().import(xt)?;
|
||||||
|
|
||||||
let weak = Arc::downgrade(&result);
|
let weak = Arc::downgrade(&result);
|
||||||
@@ -74,7 +70,7 @@ impl<Ex, Hash, V, S, E> Pool<Ex, Hash, V, S, E> where
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Return an event stream of transactions imported to the pool.
|
/// Return an event stream of transactions imported to the pool.
|
||||||
pub fn import_notification_stream(&self) -> mpsc::UnboundedReceiver<Weak<V::VerifiedTransaction>> {
|
pub fn import_notification_stream(&self) -> mpsc::UnboundedReceiver<Weak<VEx>> {
|
||||||
let (sink, stream) = mpsc::unbounded();
|
let (sink, stream) = mpsc::unbounded();
|
||||||
self.import_notification_sinks.lock().push(sink);
|
self.import_notification_sinks.lock().push(sink);
|
||||||
stream
|
stream
|
||||||
@@ -87,11 +83,15 @@ impl<Ex, Hash, V, S, E> Pool<Ex, Hash, V, S, E> where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Imports a bunch of extrinsics to the pool
|
/// Imports a bunch of unverified extrinsics to the pool
|
||||||
pub fn submit(&self, xts: Vec<Ex>) -> Result<Vec<Arc<V::VerifiedTransaction>>, E> {
|
pub fn submit<V, Ex, T>(&self, verifier: V, xts: T) -> Result<Vec<Arc<VEx>>, E> where
|
||||||
|
V: txpool::Verifier<Ex, VerifiedTransaction=VEx>,
|
||||||
|
E: From<V::Error>,
|
||||||
|
T: IntoIterator<Item=Ex>
|
||||||
|
{
|
||||||
xts
|
xts
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|xt| self.verifier.verify_transaction(xt))
|
.map(|xt| verifier.verify_transaction(xt))
|
||||||
.map(|xt| {
|
.map(|xt| {
|
||||||
Ok(self.pool.write().import(xt?)?)
|
Ok(self.pool.write().import(xt?)?)
|
||||||
})
|
})
|
||||||
@@ -99,13 +99,16 @@ impl<Ex, Hash, V, S, E> Pool<Ex, Hash, V, S, E> where
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Import a single extrinsic and starts to watch their progress in the pool.
|
/// Import a single extrinsic and starts to watch their progress in the pool.
|
||||||
pub fn submit_and_watch(&self, xt: Ex) -> Result<Watcher<Hash>, E> {
|
pub fn submit_and_watch<V, Ex>(&self, verifier: V, xt: Ex) -> Result<Watcher<Hash>, E> where
|
||||||
let xt = self.submit(vec![xt])?.pop().expect("One extrinsic passed; one result returned; qed");
|
V: txpool::Verifier<Ex, VerifiedTransaction=VEx>,
|
||||||
|
E: From<V::Error>,
|
||||||
|
{
|
||||||
|
let xt = self.submit(verifier, vec![xt])?.pop().expect("One extrinsic passed; one result returned; qed");
|
||||||
Ok(self.pool.write().listener_mut().create_watcher(xt))
|
Ok(self.pool.write().listener_mut().create_watcher(xt))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Remove from the pool.
|
/// Remove from the pool.
|
||||||
pub fn remove(&self, hashes: &[Hash], is_valid: bool) -> Vec<Option<Arc<V::VerifiedTransaction>>> {
|
pub fn remove(&self, hashes: &[Hash], is_valid: bool) -> Vec<Option<Arc<VEx>>> {
|
||||||
let mut pool = self.pool.write();
|
let mut pool = self.pool.write();
|
||||||
let mut results = Vec::with_capacity(hashes.len());
|
let mut results = Vec::with_capacity(hashes.len());
|
||||||
for hash in hashes {
|
for hash in hashes {
|
||||||
@@ -115,24 +118,14 @@ impl<Ex, Hash, V, S, E> Pool<Ex, Hash, V, S, E> where
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Cull transactions from the queue.
|
/// Cull transactions from the queue.
|
||||||
pub fn cull<R>(&self, senders: Option<&[<V::VerifiedTransaction as txpool::VerifiedTransaction>::Sender]>, ready: R) -> usize where
|
pub fn cull<R>(&self, senders: Option<&[<VEx as txpool::VerifiedTransaction>::Sender]>, ready: R) -> usize where
|
||||||
R: txpool::Ready<V::VerifiedTransaction>,
|
R: txpool::Ready<VEx>,
|
||||||
{
|
{
|
||||||
self.pool.write().cull(senders, ready)
|
self.pool.write().cull(senders, ready)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Cull transactions from the queue and then compute the pending set.
|
|
||||||
pub fn cull_and_get_pending<R, F, T>(&self, ready: R, f: F) -> T where
|
|
||||||
R: txpool::Ready<V::VerifiedTransaction> + Clone,
|
|
||||||
F: FnOnce(txpool::PendingIterator<V::VerifiedTransaction, R, S, Listener<Hash>>) -> T,
|
|
||||||
{
|
|
||||||
let mut pool = self.pool.write();
|
|
||||||
pool.cull(None, ready.clone());
|
|
||||||
f(pool.pending(ready))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get the full status of the queue (including readiness)
|
/// Get the full status of the queue (including readiness)
|
||||||
pub fn status<R: txpool::Ready<V::VerifiedTransaction>>(&self, ready: R) -> txpool::Status {
|
pub fn status<R: txpool::Ready<VEx>>(&self, ready: R) -> txpool::Status {
|
||||||
self.pool.read().status(ready)
|
self.pool.read().status(ready)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -140,4 +133,21 @@ impl<Ex, Hash, V, S, E> Pool<Ex, Hash, V, S, E> where
|
|||||||
pub fn light_status(&self) -> txpool::LightStatus {
|
pub fn light_status(&self) -> txpool::LightStatus {
|
||||||
self.pool.read().light_status()
|
self.pool.read().light_status()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Removes all transactions from given sender
|
||||||
|
pub fn remove_sender(&self, sender: VEx::Sender) -> Vec<Arc<VEx>> {
|
||||||
|
let mut pool = self.pool.write();
|
||||||
|
let pending = pool.pending_from_sender(|_: &VEx| txpool::Readiness::Ready, &sender).collect();
|
||||||
|
// remove all transactions from this sender
|
||||||
|
pool.cull(Some(&[sender]), |_: &VEx| txpool::Readiness::Stale);
|
||||||
|
pending
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Retrieve the pending set. Be careful to not leak the pool `ReadGuard` to prevent deadlocks.
|
||||||
|
pub fn pending<R, F, T>(&self, ready: R, f: F) -> T where
|
||||||
|
R: txpool::Ready<VEx>,
|
||||||
|
F: FnOnce(txpool::PendingIterator<VEx, R, S, Listener<Hash>>) -> T,
|
||||||
|
{
|
||||||
|
f(self.pool.read().pending(ready))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,8 +17,13 @@
|
|||||||
//! Substrate block-author/full-node API.
|
//! Substrate block-author/full-node API.
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use client::{self, Client};
|
||||||
use extrinsic_pool::api::{Error, ExtrinsicPool};
|
use extrinsic_pool::api::{Error, ExtrinsicPool};
|
||||||
|
|
||||||
|
use runtime_primitives::{generic, traits::Block as BlockT};
|
||||||
|
use state_machine;
|
||||||
|
|
||||||
pub mod error;
|
pub mod error;
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@@ -35,13 +40,34 @@ build_rpc_trait! {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Ex, Hash, T> AuthorApi<Hash, Ex> for Arc<T> where
|
/// Authoring API
|
||||||
T: ExtrinsicPool<Ex, Hash>,
|
pub struct Author<B, E, Block: BlockT, P> {
|
||||||
T::Error: 'static,
|
/// Substrate client
|
||||||
|
client: Arc<Client<B, E, Block>>,
|
||||||
|
/// Extrinsic pool
|
||||||
|
pool: Arc<P>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<B, E, Block: BlockT, P> Author<B, E, Block, P> {
|
||||||
|
/// Create new instance of Authoring API.
|
||||||
|
pub fn new(client: Arc<Client<B, E, Block>>, pool: Arc<P>) -> Self {
|
||||||
|
Author { client, pool }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
impl<B, E, Block, P, Ex, Hash> AuthorApi<Hash, Ex> for Author<B, E, Block, P> where
|
||||||
|
B: client::backend::Backend<Block> + Send + Sync + 'static,
|
||||||
|
E: client::CallExecutor<Block> + Send + Sync + 'static,
|
||||||
|
Block: BlockT + 'static,
|
||||||
|
client::error::Error: From<<<B as client::backend::Backend<Block>>::State as state_machine::backend::Backend>::Error>,
|
||||||
|
P: ExtrinsicPool<Ex, generic::BlockId<Block>, Hash>,
|
||||||
|
P::Error: 'static,
|
||||||
{
|
{
|
||||||
fn submit_extrinsic(&self, xt: Ex) -> Result<Hash> {
|
fn submit_extrinsic(&self, xt: Ex) -> Result<Hash> {
|
||||||
self
|
let best_block_hash = self.client.info().unwrap().chain.best_hash;
|
||||||
.submit(vec![xt])
|
self.pool
|
||||||
|
.submit(generic::BlockId::hash(best_block_hash), vec![xt])
|
||||||
.map(|mut res| res.pop().expect("One extrinsic passed; one result back; qed"))
|
.map(|mut res| res.pop().expect("One extrinsic passed; one result back; qed"))
|
||||||
.map_err(|e| e.into_pool_error()
|
.map_err(|e| e.into_pool_error()
|
||||||
.map(Into::into)
|
.map(Into::into)
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ use super::*;
|
|||||||
|
|
||||||
use std::{fmt, sync::Arc};
|
use std::{fmt, sync::Arc};
|
||||||
use extrinsic_pool::api;
|
use extrinsic_pool::api;
|
||||||
|
use test_client;
|
||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
|
|
||||||
type Extrinsic = u64;
|
type Extrinsic = u64;
|
||||||
@@ -40,11 +41,11 @@ impl fmt::Display for Error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl api::ExtrinsicPool<Extrinsic, Hash> for DummyTxPool {
|
impl<BlockHash> api::ExtrinsicPool<Extrinsic, BlockHash, u64> for DummyTxPool {
|
||||||
type Error = Error;
|
type Error = Error;
|
||||||
|
|
||||||
/// Submit extrinsic for inclusion in block.
|
/// Submit extrinsic for inclusion in block.
|
||||||
fn submit(&self, xt: Vec<Extrinsic>) -> ::std::result::Result<Vec<Hash>, Self::Error> {
|
fn submit(&self, _block: BlockHash, xt: Vec<Extrinsic>) -> ::std::result::Result<Vec<Hash>, Self::Error> {
|
||||||
let mut submitted = self.submitted.lock();
|
let mut submitted = self.submitted.lock();
|
||||||
if submitted.len() < 1 {
|
if submitted.len() < 1 {
|
||||||
let hashes = xt.iter().map(|_xt| 1).collect();
|
let hashes = xt.iter().map(|_xt| 1).collect();
|
||||||
@@ -58,7 +59,10 @@ impl api::ExtrinsicPool<Extrinsic, Hash> for DummyTxPool {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn submit_transaction_should_not_cause_error() {
|
fn submit_transaction_should_not_cause_error() {
|
||||||
let p = Arc::new(DummyTxPool::default());
|
let p = Author {
|
||||||
|
client: Arc::new(test_client::new()),
|
||||||
|
pool: Arc::new(DummyTxPool::default()),
|
||||||
|
};
|
||||||
|
|
||||||
assert_matches!(
|
assert_matches!(
|
||||||
AuthorApi::submit_extrinsic(&p, 5),
|
AuthorApi::submit_extrinsic(&p, 5),
|
||||||
|
|||||||
@@ -115,7 +115,7 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn check<ThisLookup>(self, lookup: ThisLookup) -> Result<Self::Checked, &'static str> where
|
fn check<ThisLookup>(self, lookup: ThisLookup) -> Result<Self::Checked, &'static str> where
|
||||||
ThisLookup: FnOnce(Address) -> Result<AccountId, &'static str> + Send + Sync,
|
ThisLookup: FnOnce(Address) -> Result<AccountId, &'static str>,
|
||||||
{
|
{
|
||||||
if !self.is_signed() {
|
if !self.is_signed() {
|
||||||
Ok(CheckedExtrinsic(Extrinsic {
|
Ok(CheckedExtrinsic(Extrinsic {
|
||||||
|
|||||||
@@ -162,7 +162,7 @@ impl<Call: 'static + AuxDispatchable + Slicable + Sized + Send + Sync + Serializ
|
|||||||
type Address = u64;
|
type Address = u64;
|
||||||
type AccountId = u64;
|
type AccountId = u64;
|
||||||
fn sender(&self) -> &u64 { &(self.0).0 }
|
fn sender(&self) -> &u64 { &(self.0).0 }
|
||||||
fn check<ThisLookup: FnOnce(Self::Address) -> Result<Self::AccountId, &'static str> + Send + Sync>(self, _lookup: ThisLookup) -> Result<Self::Checked, &'static str> { Ok(self) }
|
fn check<ThisLookup: FnOnce(Self::Address) -> Result<Self::AccountId, &'static str>>(self, _lookup: ThisLookup) -> Result<Self::Checked, &'static str> { Ok(self) }
|
||||||
}
|
}
|
||||||
impl<Call: AuxDispatchable<Aux = u64> + Slicable + Sized + Send + Sync + Serialize + DeserializeOwned + Clone + Eq + Debug> Applyable for TestXt<Call> {
|
impl<Call: AuxDispatchable<Aux = u64> + Slicable + Sized + Send + Sync + Serialize + DeserializeOwned + Clone + Eq + Debug> Applyable for TestXt<Call> {
|
||||||
type AccountId = u64;
|
type AccountId = u64;
|
||||||
|
|||||||
@@ -376,13 +376,13 @@ pub trait Checkable: Sized + Send + Sync {
|
|||||||
type AccountId: Member + MaybeDisplay;
|
type AccountId: Member + MaybeDisplay;
|
||||||
type Checked: Member;
|
type Checked: Member;
|
||||||
fn sender(&self) -> &Self::Address;
|
fn sender(&self) -> &Self::Address;
|
||||||
fn check<ThisLookup: FnOnce(Self::Address) -> Result<Self::AccountId, &'static str> + Send + Sync>(self, lookup: ThisLookup) -> Result<Self::Checked, &'static str>;
|
fn check<ThisLookup: FnOnce(Self::Address) -> Result<Self::AccountId, &'static str>>(self, lookup: ThisLookup) -> Result<Self::Checked, &'static str>;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A "checkable" piece of information, used by the standard Substrate Executive in order to
|
/// A "checkable" piece of information, used by the standard Substrate Executive in order to
|
||||||
/// check the validity of a piece of extrinsic information, usually by verifying the signature.
|
/// check the validity of a piece of extrinsic information, usually by verifying the signature.
|
||||||
///
|
///
|
||||||
/// This does that checking without requiring a lookup argument.
|
/// This does that checking without requiring a lookup argument.
|
||||||
pub trait BlindCheckable: Sized + Send + Sync {
|
pub trait BlindCheckable: Sized + Send + Sync {
|
||||||
type Address: Member + MaybeDisplay;
|
type Address: Member + MaybeDisplay;
|
||||||
type Checked: Member;
|
type Checked: Member;
|
||||||
@@ -395,7 +395,7 @@ impl<T: BlindCheckable> Checkable for T {
|
|||||||
type AccountId = <Self as BlindCheckable>::Address;
|
type AccountId = <Self as BlindCheckable>::Address;
|
||||||
type Checked = <Self as BlindCheckable>::Checked;
|
type Checked = <Self as BlindCheckable>::Checked;
|
||||||
fn sender(&self) -> &Self::Address { BlindCheckable::sender(self) }
|
fn sender(&self) -> &Self::Address { BlindCheckable::sender(self) }
|
||||||
fn check<ThisLookup: FnOnce(Self::Address) -> Result<Self::AccountId, &'static str> + Send + Sync>(self, _: ThisLookup) -> Result<Self::Checked, &'static str> { BlindCheckable::check(self) }
|
fn check<ThisLookup: FnOnce(Self::Address) -> Result<Self::AccountId, &'static str>>(self, _: ThisLookup) -> Result<Self::Checked, &'static str> { BlindCheckable::check(self) }
|
||||||
}
|
}
|
||||||
|
|
||||||
/// An "executable" piece of information, used by the standard Substrate Executive in order to
|
/// An "executable" piece of information, used by the standard Substrate Executive in order to
|
||||||
|
|||||||
Reference in New Issue
Block a user