Files
pezkuwi-subxt/substrate/client/transaction-pool/src/lib.rs
T
Ashley ead6815ae4 Fix timer panics in the wasm light client (#4561)
* Make WASM browser thing compile

* Fix

* updated exit-future (github repo)

* Switch to broadcast crate

* Migrate client/cli

* Switch exit-future to modernize branch

* Small changes

* Switch to cargo version and fix fg tests

* fix basic-authorship

* Fix crash on grafana macro

* Fix grafana macro

* Switch node python version

* Disable record_metrics_slice in grafana macro on wasm

* Update client/grafana-data-source/src/lib.rs

* Revert "Update client/grafana-data-source/src/lib.rs"

This reverts commit 888009a8e0b7051bd4bfbbfdb0448bcf2e2aae93.

* Add wasm support for state machine

* Switch to my own libp2p version

* Revert "Switch to my own libp2p version"

This reverts commit ce613871b59264b3165b45c37943e6560240daa7.

* Revert "Add wasm support for state machine"

This reverts commit de7eaa0694d9534fc3b164621737968e9a6a7c5f.

* Add sc-browser

* Squash

* remove sc-browser

* Fix keystore on wasm

* stubs for removed functions to make env compatible with old runtimes

* Add test (that doesn't work)

* Fix build scripts

* Revert basic-authorship due to no panics

* Revert cli/informant

* Revert consensus

* revert offchain

* Update utils/browser/Cargo.toml

Co-Authored-By: Benjamin Kampmann <ben@gnunicorn.org>

* export console functions

* Add new chainspec

* Fix ws in chain spec

* revert chainspec

* Fix chainspec

* Use an Option<PathBuf> in keystore instead of cfg flags

* Remove crud

* Only use wasm-timer for instant and systemtime

* Remove telemetry changes

* Assuming this is ok

* Add a KeystoreConfig

* Add stubs back in

* Update libp2p

* Revert "Add stubs back in"

This reverts commit 4690cf1882aa0f99f7f00a58c4080c8aa9b77c36.

* Remove commented js again

* Bump kvdb-web version

* Fix cli

* Switch branch on futures-timer

* Fix tests

* Remove sc-client test build in check-web-wasm because there isn't a good way to build futures-timer with wasm-bindgen support in the build

* Remove more things ^^

* Switch branch on futures-timer back

* Put DB io stats behind a cfg flag

* Fix things

* Don't timeout transports on wasm

* Update branch of futures-timer and fix bad merge

* Spawn informant

* Fix network test

* Fix delay resets

* Changes

* Fix tests

* use wasm_timer for transaction pool

* Fixes

* Switch futures-timer to crates

* Only diagnose futures on native

* Fix sc-network-test tests

* Select log level in js

* Fix syncing ;^)

* Allow disabling colours in the informant

* Use OutputFormat enum for informant

* MallocSizeOf impl on transaction pool broke stuff because wasm_timer::Instant doesnt impl it so just revert the transaction pool to master

* Update futures-diagnose

* Revert "MallocSizeOf impl on transaction pool broke stuff because wasm_timer::Instant doesnt impl it so just revert the transaction pool to master"

This reverts commit baa4ffc94fd968b6660a2c17ba8113e06af15548.

* Pass whole chain spec in start_client

* Get Instant::now to work in transaction pool again

* Informant dep reordering

Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com>
Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
Co-authored-by: Svyatoslav Nikolsky <svyatonik@gmail.com>
Co-authored-by: Benjamin Kampmann <ben.kampmann@googlemail.com>
Co-authored-by: Demi Obenour <48690212+DemiMarie-parity@users.noreply.github.com>
2020-02-10 12:23:55 +01:00

370 lines
10 KiB
Rust

// Copyright 2018-2020 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Substrate transaction pool implementation.
#![warn(missing_docs)]
#![warn(unused_extern_crates)]
mod api;
pub mod error;
#[cfg(any(feature = "test-helpers", test))]
pub mod testing;
pub use sc_transaction_graph as txpool;
pub use crate::api::{FullChainApi, LightChainApi};
use std::{collections::HashMap, sync::Arc, pin::Pin};
use futures::{Future, FutureExt, future::ready};
use parking_lot::Mutex;
use sp_runtime::{
generic::BlockId,
traits::{Block as BlockT, NumberFor, SimpleArithmetic, Extrinsic},
};
use sp_transaction_pool::{
TransactionPool, PoolStatus, ImportNotificationStream,
TxHash, TransactionFor, TransactionStatusStreamFor, BlockHash,
MaintainedTransactionPool, PoolFuture,
};
use wasm_timer::Instant;
/// Basic implementation of transaction pool that can be customized by providing PoolApi.
pub struct BasicPool<PoolApi, Block>
where
Block: BlockT,
PoolApi: sc_transaction_graph::ChainApi<Block=Block, Hash=Block::Hash>,
{
pool: Arc<sc_transaction_graph::Pool<PoolApi>>,
api: Arc<PoolApi>,
revalidation_strategy: Arc<Mutex<RevalidationStrategy<NumberFor<Block>>>>,
}
#[cfg(not(target_os = "unknown"))]
impl<PoolApi, Block> parity_util_mem::MallocSizeOf for BasicPool<PoolApi, Block>
where
PoolApi: sc_transaction_graph::ChainApi<Block=Block, Hash=Block::Hash>,
PoolApi::Hash: parity_util_mem::MallocSizeOf,
Block: BlockT,
{
fn size_of(&self, ops: &mut parity_util_mem::MallocSizeOfOps) -> usize {
// other entries insignificant or non-primary references
self.pool.size_of(ops)
}
}
/// Type of revalidation.
pub enum RevalidationType {
/// Light revalidation type.
///
/// During maintenance, transaction pool makes periodic revalidation
/// of all transactions depending on number of blocks or time passed.
/// Also this kind of revalidation does not resubmit transactions from
/// retracted blocks, since it is too expensive.
Light,
/// Full revalidation type.
///
/// During maintenance, transaction pool revalidates some fixed amount of
/// transactions from the pool of valid transactions.
Full,
}
impl<PoolApi, Block> BasicPool<PoolApi, Block>
where
Block: BlockT,
PoolApi: sc_transaction_graph::ChainApi<Block=Block, Hash=Block::Hash>,
{
/// Create new basic transaction pool with provided api.
pub fn new(
options: sc_transaction_graph::Options,
pool_api: Arc<PoolApi>,
) -> Self {
Self::with_revalidation_type(options, pool_api, RevalidationType::Full)
}
/// Create new basic transaction pool with provided api and custom
/// revalidation type.
pub fn with_revalidation_type(
options: sc_transaction_graph::Options,
pool_api: Arc<PoolApi>,
revalidation_type: RevalidationType,
) -> Self {
let cloned_api = pool_api.clone();
BasicPool {
api: cloned_api,
pool: Arc::new(sc_transaction_graph::Pool::new(options, pool_api)),
revalidation_strategy: Arc::new(Mutex::new(
match revalidation_type {
RevalidationType::Light => RevalidationStrategy::Light(RevalidationStatus::NotScheduled),
RevalidationType::Full => RevalidationStrategy::Always,
}
)),
}
}
/// Gets shared reference to the underlying pool.
pub fn pool(&self) -> &Arc<sc_transaction_graph::Pool<PoolApi>> {
&self.pool
}
}
impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
where
Block: BlockT,
PoolApi: 'static + sc_transaction_graph::ChainApi<Block=Block, Hash=Block::Hash>,
{
type Block = PoolApi::Block;
type Hash = sc_transaction_graph::ExHash<PoolApi>;
type InPoolTransaction = sc_transaction_graph::base_pool::Transaction<TxHash<Self>, TransactionFor<Self>>;
type Error = PoolApi::Error;
fn submit_at(
&self,
at: &BlockId<Self::Block>,
xts: Vec<TransactionFor<Self>>,
) -> PoolFuture<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error> {
let pool = self.pool.clone();
let at = *at;
async move {
pool.submit_at(&at, xts, false).await
}.boxed()
}
fn submit_one(
&self,
at: &BlockId<Self::Block>,
xt: TransactionFor<Self>,
) -> PoolFuture<TxHash<Self>, Self::Error> {
let pool = self.pool.clone();
let at = *at;
async move {
pool.submit_one(&at, xt).await
}.boxed()
}
fn submit_and_watch(
&self,
at: &BlockId<Self::Block>,
xt: TransactionFor<Self>,
) -> PoolFuture<Box<TransactionStatusStreamFor<Self>>, Self::Error> {
let at = *at;
let pool = self.pool.clone();
async move {
pool.submit_and_watch(&at, xt)
.map(|result| result.map(|watcher| Box::new(watcher.into_stream()) as _))
.await
}.boxed()
}
fn remove_invalid(&self, hashes: &[TxHash<Self>]) -> Vec<Arc<Self::InPoolTransaction>> {
self.pool.remove_invalid(hashes)
}
fn status(&self) -> PoolStatus {
self.pool.status()
}
fn ready(&self) -> Box<dyn Iterator<Item=Arc<Self::InPoolTransaction>>> {
Box::new(self.pool.ready())
}
fn import_notification_stream(&self) -> ImportNotificationStream<TxHash<Self>> {
self.pool.import_notification_stream()
}
fn hash_of(&self, xt: &TransactionFor<Self>) -> TxHash<Self> {
self.pool.hash_of(xt)
}
fn on_broadcasted(&self, propagations: HashMap<TxHash<Self>, Vec<String>>) {
self.pool.on_broadcasted(propagations)
}
fn ready_transaction(&self, hash: &TxHash<Self>) -> Option<Arc<Self::InPoolTransaction>> {
self.pool.ready_transaction(hash)
}
}
#[cfg_attr(test, derive(Debug))]
enum RevalidationStatus<N> {
/// The revalidation has never been completed.
NotScheduled,
/// The revalidation is scheduled.
Scheduled(Option<Instant>, Option<N>),
/// The revalidation is in progress.
InProgress,
}
enum RevalidationStrategy<N> {
Always,
Light(RevalidationStatus<N>)
}
struct RevalidationAction {
revalidate: bool,
resubmit: bool,
revalidate_amount: Option<usize>,
}
impl<N: Clone + Copy + SimpleArithmetic> RevalidationStrategy<N> {
pub fn clear(&mut self) {
if let Self::Light(status) = self {
status.clear()
}
}
pub fn next(
&mut self,
block: N,
revalidate_time_period: Option<std::time::Duration>,
revalidate_block_period: Option<N>,
) -> RevalidationAction {
match self {
Self::Light(status) => RevalidationAction {
revalidate: status.next_required(
block,
revalidate_time_period,
revalidate_block_period
),
resubmit: false,
revalidate_amount: None,
},
Self::Always => RevalidationAction {
revalidate: true,
resubmit: true,
revalidate_amount: Some(16),
}
}
}
}
impl<N: Clone + Copy + SimpleArithmetic> RevalidationStatus<N> {
/// Called when revalidation is completed.
pub fn clear(&mut self) {
*self = Self::NotScheduled;
}
/// Returns true if revalidation is required.
pub fn next_required(
&mut self,
block: N,
revalidate_time_period: Option<std::time::Duration>,
revalidate_block_period: Option<N>,
) -> bool {
match *self {
Self::NotScheduled => {
*self = Self::Scheduled(
revalidate_time_period.map(|period| Instant::now() + period),
revalidate_block_period.map(|period| block + period),
);
false
},
Self::Scheduled(revalidate_at_time, revalidate_at_block) => {
let is_required = revalidate_at_time.map(|at| Instant::now() >= at).unwrap_or(false)
|| revalidate_at_block.map(|at| block >= at).unwrap_or(false);
if is_required {
*self = Self::InProgress;
}
is_required
},
Self::InProgress => false,
}
}
}
impl<PoolApi, Block> MaintainedTransactionPool for BasicPool<PoolApi, Block>
where
Block: BlockT,
PoolApi: 'static + sc_transaction_graph::ChainApi<Block=Block, Hash=Block::Hash>,
{
fn maintain(&self, id: &BlockId<Self::Block>, retracted: &[BlockHash<Self>])
-> Pin<Box<dyn Future<Output=()> + Send>>
{
let id = id.clone();
let pool = self.pool.clone();
let api = self.api.clone();
let block_number = match api.block_id_to_number(&id) {
Ok(Some(number)) => number,
_ => {
log::trace!(target: "txqueue", "Skipping chain event - no number for that block {:?}", id);
return Box::pin(ready(()));
}
};
let next_action = self.revalidation_strategy.lock().next(
block_number,
Some(std::time::Duration::from_secs(60)),
Some(20.into()),
);
let revalidation_strategy = self.revalidation_strategy.clone();
let retracted = retracted.to_vec();
async move {
// We don't query block if we won't prune anything
if !pool.status().is_empty() {
let hashes = api.block_body(&id).await
.unwrap_or_else(|e| {
log::warn!("Prune known transactions: error request {:?}!", e);
None
})
.unwrap_or_default()
.into_iter()
.map(|tx| pool.hash_of(&tx))
.collect::<Vec<_>>();
if let Err(e) = pool.prune_known(&id, &hashes) {
log::error!("Cannot prune known in the pool {:?}!", e);
}
}
if next_action.resubmit {
let mut resubmit_transactions = Vec::new();
for retracted_hash in retracted {
let block_transactions = api.block_body(&BlockId::hash(retracted_hash.clone())).await
.unwrap_or_else(|e| {
log::warn!("Failed to fetch block body {:?}!", e);
None
})
.unwrap_or_default()
.into_iter()
.filter(|tx| tx.is_signed().unwrap_or(true));
resubmit_transactions.extend(block_transactions);
}
if let Err(e) = pool.submit_at(&id, resubmit_transactions, true).await {
log::debug!(target: "txpool",
"[{:?}] Error re-submitting transactions: {:?}", id, e
)
}
}
if next_action.revalidate {
if let Err(e) = pool.revalidate_ready(&id, next_action.revalidate_amount).await {
log::warn!("Revalidate ready failed {:?}", e);
}
}
revalidation_strategy.lock().clear();
}.boxed()
}
}