mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-27 02:17:58 +00:00
Adds fork-awareness and finalization notifications to transaction pool watchers. (#4740)
* adds finalization support to sc-transaction-pool using MaintainedTransactionPool for finalization events * adds TransactionStatus::Retracted, notify watchers of retracted blocks, finalized now finalizes, transactions for current finalized -> last finalized block * adds last_finalized to ChainApi, use generic BlockT for ChainEvent * fix tests * Apply suggestions from code review Co-Authored-By: Tomasz Drwięga <tomusdrw@users.noreply.github.com> * tests * fix tests, docs, lazily dedupe pruned hashes * fix tests, Cargo.lock * Apply suggestions from code review Co-Authored-By: Tomasz Drwięga <tomusdrw@users.noreply.github.com> * remove tree_route, last_finalized from ChainApi, add block hash to Finalization and Retracted events * prune finality watchers * fix tests * remove HeaderBackend bound from FullChainApi * code style nits, terminate stream in finality_timeout Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>
This commit is contained in:
@@ -16,30 +16,34 @@
|
||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
fmt,
|
||||
hash,
|
||||
collections::HashMap, hash,
|
||||
};
|
||||
use linked_hash_map::LinkedHashMap;
|
||||
use serde::Serialize;
|
||||
use crate::watcher;
|
||||
use sp_runtime::traits;
|
||||
use crate::{watcher, ChainApi, BlockHash};
|
||||
use log::{debug, trace, warn};
|
||||
use sp_runtime::traits;
|
||||
|
||||
/// Extrinsic pool default listener.
|
||||
pub struct Listener<H: hash::Hash + Eq, H2> {
|
||||
watchers: HashMap<H, watcher::Sender<H, H2>>
|
||||
pub struct Listener<H: hash::Hash + Eq, C: ChainApi> {
|
||||
watchers: HashMap<H, watcher::Sender<H, BlockHash<C>>>,
|
||||
finality_watchers: LinkedHashMap<BlockHash<C>, Vec<H>>,
|
||||
}
|
||||
|
||||
impl<H: hash::Hash + Eq, H2> Default for Listener<H, H2> {
|
||||
/// Maximum number of blocks awaiting finality at any time.
|
||||
const MAX_FINALITY_WATCHERS: usize = 512;
|
||||
|
||||
impl<H: hash::Hash + Eq, C: ChainApi> Default for Listener<H, C> {
|
||||
fn default() -> Self {
|
||||
Listener {
|
||||
watchers: Default::default(),
|
||||
finality_watchers: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<H: hash::Hash + traits::Member + Serialize, H2: Clone + fmt::Debug> Listener<H, H2> {
|
||||
fn fire<F>(&mut self, hash: &H, fun: F) where F: FnOnce(&mut watcher::Sender<H, H2>) {
|
||||
impl<H: hash::Hash + traits::Member + Serialize, C: ChainApi> Listener<H, C> {
|
||||
fn fire<F>(&mut self, hash: &H, fun: F) where F: FnOnce(&mut watcher::Sender<H, BlockHash<C>>) {
|
||||
let clean = if let Some(h) = self.watchers.get_mut(hash) {
|
||||
fun(h);
|
||||
h.is_done()
|
||||
@@ -55,7 +59,7 @@ impl<H: hash::Hash + traits::Member + Serialize, H2: Clone + fmt::Debug> Listene
|
||||
/// Creates a new watcher for given verified extrinsic.
|
||||
///
|
||||
/// The watcher can be used to subscribe to lifecycle events of that extrinsic.
|
||||
pub fn create_watcher(&mut self, hash: H) -> watcher::Watcher<H, H2> {
|
||||
pub fn create_watcher(&mut self, hash: H) -> watcher::Watcher<H, BlockHash<C>> {
|
||||
let sender = self.watchers.entry(hash.clone()).or_insert_with(watcher::Sender::default);
|
||||
sender.new_watcher(hash)
|
||||
}
|
||||
@@ -101,8 +105,34 @@ impl<H: hash::Hash + traits::Member + Serialize, H2: Clone + fmt::Debug> Listene
|
||||
}
|
||||
|
||||
/// Transaction was pruned from the pool.
|
||||
pub fn pruned(&mut self, header_hash: H2, tx: &H) {
|
||||
debug!(target: "txpool", "[{:?}] Pruned at {:?}", tx, header_hash);
|
||||
self.fire(tx, |watcher| watcher.in_block(header_hash))
|
||||
pub fn pruned(&mut self, block_hash: BlockHash<C>, tx: &H) {
|
||||
debug!(target: "txpool", "[{:?}] Pruned at {:?}", tx, block_hash);
|
||||
self.fire(tx, |s| s.in_block(block_hash));
|
||||
self.finality_watchers.entry(block_hash).or_insert(vec![]).push(tx.clone());
|
||||
|
||||
while self.finality_watchers.len() > MAX_FINALITY_WATCHERS {
|
||||
if let Some((hash, txs)) = self.finality_watchers.pop_front() {
|
||||
for tx in txs {
|
||||
self.fire(&tx, |s| s.finality_timeout(hash.clone()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The block this transaction was included in has been retracted.
|
||||
pub fn retracted(&mut self, block_hash: BlockHash<C>) {
|
||||
if let Some(hashes) = self.finality_watchers.remove(&block_hash) {
|
||||
for hash in hashes {
|
||||
self.fire(&hash, |s| s.retracted(block_hash))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Notify all watchers that transactions have been finalized
|
||||
pub fn finalized(&mut self, block_hash: BlockHash<C>, txs: Vec<H>) {
|
||||
self.finality_watchers.remove(&block_hash);
|
||||
for h in txs {
|
||||
self.fire(&h, |s| s.finalized(block_hash.clone()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,7 +33,7 @@ use sp_runtime::{
|
||||
traits::{self, SaturatedConversion},
|
||||
transaction_validity::{TransactionValidity, TransactionTag as Tag, TransactionValidityError},
|
||||
};
|
||||
use sp_transaction_pool::{error, PoolStatus};
|
||||
use sp_transaction_pool::error;
|
||||
use wasm_timer::Instant;
|
||||
|
||||
use crate::validated_pool::{ValidatedPool, ValidatedTransaction};
|
||||
@@ -338,34 +338,6 @@ impl<B: ChainApi> Pool<B> {
|
||||
)
|
||||
}
|
||||
|
||||
/// Return an event stream of notifications for when transactions are imported to the pool.
|
||||
///
|
||||
/// Consumers of this stream should use the `ready` method to actually get the
|
||||
/// pending transactions in the right order.
|
||||
pub fn import_notification_stream(&self) -> EventStream<ExHash<B>> {
|
||||
self.validated_pool.import_notification_stream()
|
||||
}
|
||||
|
||||
/// Invoked when extrinsics are broadcasted.
|
||||
pub fn on_broadcasted(&self, propagated: HashMap<ExHash<B>, Vec<String>>) {
|
||||
self.validated_pool.on_broadcasted(propagated)
|
||||
}
|
||||
|
||||
/// Remove invalid transactions from the pool.
|
||||
pub fn remove_invalid(&self, hashes: &[ExHash<B>]) -> Vec<TransactionFor<B>> {
|
||||
self.validated_pool.remove_invalid(hashes)
|
||||
}
|
||||
|
||||
/// Get an iterator for ready transactions ordered by priority
|
||||
pub fn ready(&self) -> impl Iterator<Item=TransactionFor<B>> {
|
||||
self.validated_pool.ready()
|
||||
}
|
||||
|
||||
/// Returns pool status.
|
||||
pub fn status(&self) -> PoolStatus {
|
||||
self.validated_pool.status()
|
||||
}
|
||||
|
||||
/// Returns transaction hash
|
||||
pub fn hash_of(&self, xt: &ExtrinsicFor<B>) -> ExHash<B> {
|
||||
self.validated_pool.api().hash_and_length(xt).0
|
||||
@@ -454,9 +426,9 @@ impl<B: ChainApi> Pool<B> {
|
||||
(hash, validity)
|
||||
}
|
||||
|
||||
/// Get ready transaction by hash, if it present in the pool.
|
||||
pub fn ready_transaction(&self, hash: &ExHash<B>) -> Option<TransactionFor<B>> {
|
||||
self.validated_pool.ready_by_hash(hash)
|
||||
/// get a reference to the underlying validated pool.
|
||||
pub fn validated_pool(&self) -> &ValidatedPool<B> {
|
||||
&self.validated_pool
|
||||
}
|
||||
}
|
||||
|
||||
@@ -598,7 +570,7 @@ mod tests {
|
||||
}))).unwrap();
|
||||
|
||||
// then
|
||||
assert_eq!(pool.ready().map(|v| v.hash).collect::<Vec<_>>(), vec![hash]);
|
||||
assert_eq!(pool.validated_pool().ready().map(|v| v.hash).collect::<Vec<_>>(), vec![hash]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -615,8 +587,8 @@ mod tests {
|
||||
// when
|
||||
pool.validated_pool.rotator().ban(&Instant::now(), vec![pool.hash_of(&uxt)]);
|
||||
let res = block_on(pool.submit_one(&BlockId::Number(0), uxt));
|
||||
assert_eq!(pool.status().ready, 0);
|
||||
assert_eq!(pool.status().future, 0);
|
||||
assert_eq!(pool.validated_pool().status().ready, 0);
|
||||
assert_eq!(pool.validated_pool().status().future, 0);
|
||||
|
||||
// then
|
||||
assert_matches!(res.unwrap_err(), error::Error::TemporarilyBanned);
|
||||
@@ -627,7 +599,7 @@ mod tests {
|
||||
let stream = {
|
||||
// given
|
||||
let pool = pool();
|
||||
let stream = pool.import_notification_stream();
|
||||
let stream = pool.validated_pool().import_notification_stream();
|
||||
|
||||
// when
|
||||
let _hash = block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer {
|
||||
@@ -650,8 +622,8 @@ mod tests {
|
||||
nonce: 3,
|
||||
}))).unwrap();
|
||||
|
||||
assert_eq!(pool.status().ready, 2);
|
||||
assert_eq!(pool.status().future, 1);
|
||||
assert_eq!(pool.validated_pool().status().ready, 2);
|
||||
assert_eq!(pool.validated_pool().status().future, 1);
|
||||
stream
|
||||
};
|
||||
|
||||
@@ -689,9 +661,9 @@ mod tests {
|
||||
pool.validated_pool.clear_stale(&BlockId::Number(5)).unwrap();
|
||||
|
||||
// then
|
||||
assert_eq!(pool.ready().count(), 0);
|
||||
assert_eq!(pool.status().future, 0);
|
||||
assert_eq!(pool.status().ready, 0);
|
||||
assert_eq!(pool.validated_pool().ready().count(), 0);
|
||||
assert_eq!(pool.validated_pool().status().future, 0);
|
||||
assert_eq!(pool.validated_pool().status().ready, 0);
|
||||
// make sure they are temporarily banned as well
|
||||
assert!(pool.validated_pool.rotator().is_banned(&hash1));
|
||||
assert!(pool.validated_pool.rotator().is_banned(&hash2));
|
||||
@@ -735,7 +707,7 @@ mod tests {
|
||||
amount: 5,
|
||||
nonce: 1,
|
||||
}))).unwrap();
|
||||
assert_eq!(pool.status().future, 1);
|
||||
assert_eq!(pool.validated_pool().status().future, 1);
|
||||
|
||||
// when
|
||||
let hash2 = block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer {
|
||||
@@ -746,7 +718,7 @@ mod tests {
|
||||
}))).unwrap();
|
||||
|
||||
// then
|
||||
assert_eq!(pool.status().future, 1);
|
||||
assert_eq!(pool.validated_pool().status().future, 1);
|
||||
assert!(pool.validated_pool.rotator().is_banned(&hash1));
|
||||
assert!(!pool.validated_pool.rotator().is_banned(&hash2));
|
||||
}
|
||||
@@ -773,8 +745,8 @@ mod tests {
|
||||
}))).unwrap_err();
|
||||
|
||||
// then
|
||||
assert_eq!(pool.status().ready, 0);
|
||||
assert_eq!(pool.status().future, 0);
|
||||
assert_eq!(pool.validated_pool().status().ready, 0);
|
||||
assert_eq!(pool.validated_pool().status().future, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -791,8 +763,8 @@ mod tests {
|
||||
}))).unwrap_err();
|
||||
|
||||
// then
|
||||
assert_eq!(pool.status().ready, 0);
|
||||
assert_eq!(pool.status().future, 0);
|
||||
assert_eq!(pool.validated_pool().status().ready, 0);
|
||||
assert_eq!(pool.validated_pool().status().future, 0);
|
||||
assert_matches!(err, error::Error::NoTagsProvided);
|
||||
}
|
||||
|
||||
@@ -809,19 +781,18 @@ mod tests {
|
||||
amount: 5,
|
||||
nonce: 0,
|
||||
}))).unwrap();
|
||||
assert_eq!(pool.status().ready, 1);
|
||||
assert_eq!(pool.status().future, 0);
|
||||
assert_eq!(pool.validated_pool().status().ready, 1);
|
||||
assert_eq!(pool.validated_pool().status().future, 0);
|
||||
|
||||
// when
|
||||
block_on(pool.prune_tags(&BlockId::Number(2), vec![vec![0u8]], vec![])).unwrap();
|
||||
assert_eq!(pool.status().ready, 0);
|
||||
assert_eq!(pool.status().future, 0);
|
||||
assert_eq!(pool.validated_pool().status().ready, 0);
|
||||
assert_eq!(pool.validated_pool().status().future, 0);
|
||||
|
||||
// then
|
||||
let mut stream = futures::executor::block_on_stream(watcher.into_stream());
|
||||
assert_eq!(stream.next(), Some(TransactionStatus::Ready));
|
||||
assert_eq!(stream.next(), Some(TransactionStatus::InBlock(H256::from_low_u64_be(2).into())));
|
||||
assert_eq!(stream.next(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -834,19 +805,18 @@ mod tests {
|
||||
amount: 5,
|
||||
nonce: 0,
|
||||
}))).unwrap();
|
||||
assert_eq!(pool.status().ready, 1);
|
||||
assert_eq!(pool.status().future, 0);
|
||||
assert_eq!(pool.validated_pool().status().ready, 1);
|
||||
assert_eq!(pool.validated_pool().status().future, 0);
|
||||
|
||||
// when
|
||||
block_on(pool.prune_tags(&BlockId::Number(2), vec![vec![0u8]], vec![2u64])).unwrap();
|
||||
assert_eq!(pool.status().ready, 0);
|
||||
assert_eq!(pool.status().future, 0);
|
||||
assert_eq!(pool.validated_pool().status().ready, 0);
|
||||
assert_eq!(pool.validated_pool().status().future, 0);
|
||||
|
||||
// then
|
||||
let mut stream = futures::executor::block_on_stream(watcher.into_stream());
|
||||
assert_eq!(stream.next(), Some(TransactionStatus::Ready));
|
||||
assert_eq!(stream.next(), Some(TransactionStatus::InBlock(H256::from_low_u64_be(2).into())));
|
||||
assert_eq!(stream.next(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -859,8 +829,8 @@ mod tests {
|
||||
amount: 5,
|
||||
nonce: 1,
|
||||
}))).unwrap();
|
||||
assert_eq!(pool.status().ready, 0);
|
||||
assert_eq!(pool.status().future, 1);
|
||||
assert_eq!(pool.validated_pool().status().ready, 0);
|
||||
assert_eq!(pool.validated_pool().status().future, 1);
|
||||
|
||||
// when
|
||||
block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer {
|
||||
@@ -869,7 +839,7 @@ mod tests {
|
||||
amount: 5,
|
||||
nonce: 0,
|
||||
}))).unwrap();
|
||||
assert_eq!(pool.status().ready, 2);
|
||||
assert_eq!(pool.validated_pool().status().ready, 2);
|
||||
|
||||
// then
|
||||
let mut stream = futures::executor::block_on_stream(watcher.into_stream());
|
||||
@@ -888,7 +858,7 @@ mod tests {
|
||||
nonce: 0,
|
||||
});
|
||||
let watcher = block_on(pool.submit_and_watch(&BlockId::Number(0), uxt)).unwrap();
|
||||
assert_eq!(pool.status().ready, 1);
|
||||
assert_eq!(pool.validated_pool().status().ready, 1);
|
||||
|
||||
// when
|
||||
pool.validated_pool.remove_invalid(&[*watcher.hash()]);
|
||||
@@ -912,13 +882,13 @@ mod tests {
|
||||
nonce: 0,
|
||||
});
|
||||
let watcher = block_on(pool.submit_and_watch(&BlockId::Number(0), uxt)).unwrap();
|
||||
assert_eq!(pool.status().ready, 1);
|
||||
assert_eq!(pool.validated_pool().status().ready, 1);
|
||||
|
||||
// when
|
||||
let mut map = HashMap::new();
|
||||
let peers = vec!["a".into(), "b".into(), "c".into()];
|
||||
map.insert(*watcher.hash(), peers.clone());
|
||||
pool.on_broadcasted(map);
|
||||
pool.validated_pool().on_broadcasted(map);
|
||||
|
||||
|
||||
// then
|
||||
@@ -947,7 +917,7 @@ mod tests {
|
||||
nonce: 0,
|
||||
});
|
||||
let watcher = block_on(pool.submit_and_watch(&BlockId::Number(0), xt)).unwrap();
|
||||
assert_eq!(pool.status().ready, 1);
|
||||
assert_eq!(pool.validated_pool().status().ready, 1);
|
||||
|
||||
// when
|
||||
let xt = uxt(Transfer {
|
||||
@@ -957,7 +927,7 @@ mod tests {
|
||||
nonce: 1,
|
||||
});
|
||||
block_on(pool.submit_one(&BlockId::Number(1), xt)).unwrap();
|
||||
assert_eq!(pool.status().ready, 1);
|
||||
assert_eq!(pool.validated_pool().status().ready, 1);
|
||||
|
||||
// then
|
||||
let mut stream = futures::executor::block_on_stream(watcher.into_stream());
|
||||
@@ -1000,11 +970,11 @@ mod tests {
|
||||
// The tag the above transaction provides (TestApi is using just nonce as u8)
|
||||
let provides = vec![0_u8];
|
||||
block_on(pool.submit_one(&BlockId::Number(0), xt)).unwrap();
|
||||
assert_eq!(pool.status().ready, 1);
|
||||
assert_eq!(pool.validated_pool().status().ready, 1);
|
||||
|
||||
// Now block import happens before the second transaction is able to finish verification.
|
||||
block_on(pool.prune_tags(&BlockId::Number(1), vec![provides], vec![])).unwrap();
|
||||
assert_eq!(pool.status().ready, 0);
|
||||
assert_eq!(pool.validated_pool().status().ready, 0);
|
||||
|
||||
|
||||
// so when we release the verification of the previous one it will have
|
||||
@@ -1014,8 +984,8 @@ mod tests {
|
||||
|
||||
// then
|
||||
is_ready.recv().unwrap(); // wait for finish
|
||||
assert_eq!(pool.status().ready, 1);
|
||||
assert_eq!(pool.status().future, 0);
|
||||
assert_eq!(pool.validated_pool().status().ready, 1);
|
||||
assert_eq!(pool.validated_pool().status().future, 0);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1047,7 +1017,7 @@ mod tests {
|
||||
let tx4 = transfer(4);
|
||||
let hash4 = pool.validated_pool.api().hash_and_length(&tx4).0;
|
||||
let watcher4 = block_on(pool.submit_and_watch(&BlockId::Number(0), tx4)).unwrap();
|
||||
assert_eq!(pool.status().ready, 5);
|
||||
assert_eq!(pool.validated_pool().status().ready, 5);
|
||||
|
||||
// when
|
||||
pool.validated_pool.api().invalidate.lock().insert(hash3);
|
||||
@@ -1064,7 +1034,7 @@ mod tests {
|
||||
//
|
||||
// events for hash3 are: Ready, Invalid
|
||||
// events for hash4 are: Ready, Invalid
|
||||
assert_eq!(pool.status().ready, 2);
|
||||
assert_eq!(pool.validated_pool().status().ready, 2);
|
||||
assert_eq!(
|
||||
futures::executor::block_on_stream(watcher3.into_stream()).collect::<Vec<_>>(),
|
||||
vec![TransactionStatus::Ready, TransactionStatus::Invalid],
|
||||
@@ -1095,4 +1065,3 @@ mod tests {
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -16,12 +16,11 @@
|
||||
|
||||
use std::{
|
||||
collections::{HashSet, HashMap},
|
||||
fmt,
|
||||
hash,
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use crate::base_pool as base;
|
||||
use crate::{base_pool as base, BlockHash};
|
||||
use crate::listener::Listener;
|
||||
use crate::rotator::PoolRotator;
|
||||
use crate::watcher::Watcher;
|
||||
@@ -39,7 +38,7 @@ use sp_transaction_pool::{error, PoolStatus};
|
||||
use wasm_timer::Instant;
|
||||
|
||||
use crate::base_pool::PruneStatus;
|
||||
use crate::pool::{EventStream, Options, ChainApi, BlockHash, ExHash, ExtrinsicFor, TransactionFor};
|
||||
use crate::pool::{EventStream, Options, ChainApi, ExHash, ExtrinsicFor, TransactionFor};
|
||||
|
||||
/// Pre-validated transaction. Validated pool only accepts transactions wrapped in this enum.
|
||||
#[derive(Debug)]
|
||||
@@ -62,10 +61,10 @@ pub type ValidatedTransactionFor<B> = ValidatedTransaction<
|
||||
>;
|
||||
|
||||
/// Pool that deals with validated transactions.
|
||||
pub(crate) struct ValidatedPool<B: ChainApi> {
|
||||
pub struct ValidatedPool<B: ChainApi> {
|
||||
api: Arc<B>,
|
||||
options: Options,
|
||||
listener: RwLock<Listener<ExHash<B>, BlockHash<B>>>,
|
||||
listener: RwLock<Listener<ExHash<B>, B>>,
|
||||
pool: RwLock<base::BasePool<
|
||||
ExHash<B>,
|
||||
ExtrinsicFor<B>,
|
||||
@@ -91,9 +90,9 @@ impl<B: ChainApi> ValidatedPool<B> {
|
||||
pub fn new(options: Options, api: Arc<B>) -> Self {
|
||||
let base_pool = base::BasePool::new(options.reject_future_transactions);
|
||||
ValidatedPool {
|
||||
api,
|
||||
options,
|
||||
listener: Default::default(),
|
||||
api,
|
||||
pool: RwLock::new(base_pool),
|
||||
import_notification_sinks: Default::default(),
|
||||
rotator: Default::default(),
|
||||
@@ -138,13 +137,14 @@ impl<B: ChainApi> ValidatedPool<B> {
|
||||
let imported = self.pool.write().import(tx)?;
|
||||
|
||||
if let base::Imported::Ready { ref hash, .. } = imported {
|
||||
self.import_notification_sinks.lock().retain(|sink| sink.unbounded_send(hash.clone()).is_ok());
|
||||
self.import_notification_sinks.lock()
|
||||
.retain(|sink| sink.unbounded_send(hash.clone()).is_ok());
|
||||
}
|
||||
|
||||
let mut listener = self.listener.write();
|
||||
fire_events(&mut *listener, &imported);
|
||||
Ok(imported.hash().clone())
|
||||
}
|
||||
},
|
||||
ValidatedTransaction::Invalid(hash, err) => {
|
||||
self.rotator.ban(&Instant::now(), std::iter::once(hash));
|
||||
Err(err.into())
|
||||
@@ -152,7 +152,7 @@ impl<B: ChainApi> ValidatedPool<B> {
|
||||
ValidatedTransaction::Unknown(hash, err) => {
|
||||
self.listener.write().invalid(&hash, false);
|
||||
Err(err.into())
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -343,8 +343,7 @@ impl<B: ChainApi> ValidatedPool<B> {
|
||||
self.pool.read().by_hashes(&hashes)
|
||||
.into_iter()
|
||||
.map(|existing_in_pool| existing_in_pool
|
||||
.map(|transaction| transaction.provides.iter().cloned()
|
||||
.collect()))
|
||||
.map(|transaction| transaction.provides.iter().cloned().collect()))
|
||||
.collect()
|
||||
}
|
||||
|
||||
@@ -416,8 +415,14 @@ impl<B: ChainApi> ValidatedPool<B> {
|
||||
let header_hash = self.api.block_id_to_hash(at)?
|
||||
.ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)).into())?;
|
||||
let mut listener = self.listener.write();
|
||||
let mut set = HashSet::with_capacity(hashes.size_hint().0);
|
||||
for h in hashes {
|
||||
listener.pruned(header_hash, &h);
|
||||
// `hashes` has possibly duplicate hashes.
|
||||
// we'd like to send out the `InBlock` notification only once.
|
||||
if !set.contains(&h) {
|
||||
listener.pruned(header_hash, &h);
|
||||
set.insert(h);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -468,7 +473,10 @@ impl<B: ChainApi> ValidatedPool<B> {
|
||||
&self.api
|
||||
}
|
||||
|
||||
/// Return an event stream of transactions imported to the pool.
|
||||
/// Return an event stream of notifications for when transactions are imported to the pool.
|
||||
///
|
||||
/// Consumers of this stream should use the `ready` method to actually get the
|
||||
/// pending transactions in the right order.
|
||||
pub fn import_notification_stream(&self) -> EventStream<ExHash<B>> {
|
||||
let (sink, stream) = mpsc::unbounded();
|
||||
self.import_notification_sinks.lock().push(sink);
|
||||
@@ -492,7 +500,7 @@ impl<B: ChainApi> ValidatedPool<B> {
|
||||
pub fn remove_invalid(&self, hashes: &[ExHash<B>]) -> Vec<TransactionFor<B>> {
|
||||
// early exit in case there is no invalid transactions.
|
||||
if hashes.is_empty() {
|
||||
return vec![]
|
||||
return vec![];
|
||||
}
|
||||
|
||||
debug!(target: "txpool", "Removing invalid transactions: {:?}", hashes);
|
||||
@@ -521,14 +529,34 @@ impl<B: ChainApi> ValidatedPool<B> {
|
||||
pub fn status(&self) -> PoolStatus {
|
||||
self.pool.read().status()
|
||||
}
|
||||
|
||||
/// Notify all watchers that transactions in the block with hash have been finalized
|
||||
pub async fn on_block_finalized(&self, block_hash: BlockHash<B>) -> Result<(), B::Error> {
|
||||
debug!(target: "txpool", "Attempting to notify watchers of finalization for {}", block_hash);
|
||||
// fetch all extrinsic hashes
|
||||
if let Some(txs) = self.api.block_body(&BlockId::Hash(block_hash.clone())).await? {
|
||||
let tx_hashes = txs.into_iter()
|
||||
.map(|tx| self.api.hash_and_length(&tx).0)
|
||||
.collect::<Vec<_>>();
|
||||
// notify the watcher that these extrinsics have been finalized
|
||||
self.listener.write().finalized(block_hash, tx_hashes);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Notify the listener of retracted blocks
|
||||
pub fn on_block_retracted(&self, block_hash: BlockHash<B>) {
|
||||
self.listener.write().retracted(block_hash)
|
||||
}
|
||||
}
|
||||
|
||||
fn fire_events<H, H2, Ex>(
|
||||
listener: &mut Listener<H, H2>,
|
||||
fn fire_events<H, B, Ex>(
|
||||
listener: &mut Listener<H, B>,
|
||||
imported: &base::Imported<H, Ex>,
|
||||
) where
|
||||
H: hash::Hash + Eq + traits::Member + Serialize,
|
||||
H2: Clone + fmt::Debug,
|
||||
B: ChainApi,
|
||||
{
|
||||
match *imported {
|
||||
base::Imported::Ready { ref promoted, ref failed, ref removed, ref hash } => {
|
||||
|
||||
@@ -26,12 +26,12 @@ use sp_transaction_pool::TransactionStatus;
|
||||
///
|
||||
/// Represents a stream of status updates for particular extrinsic.
|
||||
#[derive(Debug)]
|
||||
pub struct Watcher<H, H2> {
|
||||
receiver: mpsc::UnboundedReceiver<TransactionStatus<H, H2>>,
|
||||
pub struct Watcher<H, BH> {
|
||||
receiver: mpsc::UnboundedReceiver<TransactionStatus<H, BH>>,
|
||||
hash: H,
|
||||
}
|
||||
|
||||
impl<H, H2> Watcher<H, H2> {
|
||||
impl<H, BH> Watcher<H, BH> {
|
||||
/// Returns the transaction hash.
|
||||
pub fn hash(&self) -> &H {
|
||||
&self.hash
|
||||
@@ -40,30 +40,30 @@ impl<H, H2> Watcher<H, H2> {
|
||||
/// Pipe the notifications to given sink.
|
||||
///
|
||||
/// Make sure to drive the future to completion.
|
||||
pub fn into_stream(self) -> impl Stream<Item=TransactionStatus<H, H2>> {
|
||||
pub fn into_stream(self) -> impl Stream<Item=TransactionStatus<H, BH>> {
|
||||
self.receiver
|
||||
}
|
||||
}
|
||||
|
||||
/// Sender part of the watcher. Exposed only for testing purposes.
|
||||
#[derive(Debug)]
|
||||
pub struct Sender<H, H2> {
|
||||
receivers: Vec<mpsc::UnboundedSender<TransactionStatus<H, H2>>>,
|
||||
finalized: bool,
|
||||
pub struct Sender<H, BH> {
|
||||
receivers: Vec<mpsc::UnboundedSender<TransactionStatus<H, BH>>>,
|
||||
is_finalized: bool,
|
||||
}
|
||||
|
||||
impl<H, H2> Default for Sender<H, H2> {
|
||||
impl<H, BH> Default for Sender<H, BH> {
|
||||
fn default() -> Self {
|
||||
Sender {
|
||||
receivers: Default::default(),
|
||||
finalized: false,
|
||||
is_finalized: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<H: Clone, H2: Clone> Sender<H, H2> {
|
||||
impl<H: Clone, BH: Clone> Sender<H, BH> {
|
||||
/// Add a new watcher to this sender object.
|
||||
pub fn new_watcher(&mut self, hash: H) -> Watcher<H, H2> {
|
||||
pub fn new_watcher(&mut self, hash: H) -> Watcher<H, BH> {
|
||||
let (tx, receiver) = mpsc::unbounded();
|
||||
self.receivers.push(tx);
|
||||
Watcher {
|
||||
@@ -85,26 +85,42 @@ impl<H: Clone, H2: Clone> Sender<H, H2> {
|
||||
/// Some state change (perhaps another extrinsic was included) rendered this extrinsic invalid.
|
||||
pub fn usurped(&mut self, hash: H) {
|
||||
self.send(TransactionStatus::Usurped(hash));
|
||||
self.finalized = true;
|
||||
self.is_finalized = true;
|
||||
}
|
||||
|
||||
/// Extrinsic has been included in block with given hash.
|
||||
pub fn in_block(&mut self, hash: H2) {
|
||||
pub fn in_block(&mut self, hash: BH) {
|
||||
self.send(TransactionStatus::InBlock(hash));
|
||||
self.finalized = true;
|
||||
}
|
||||
|
||||
/// Extrinsic has been finalized by a finality gadget.
|
||||
pub fn finalized(&mut self, hash: BH) {
|
||||
self.send(TransactionStatus::Finalized(hash));
|
||||
self.is_finalized = true;
|
||||
}
|
||||
|
||||
/// The block this extrinsic was included in has been retracted
|
||||
pub fn finality_timeout(&mut self, hash: BH) {
|
||||
self.send(TransactionStatus::FinalityTimeout(hash));
|
||||
self.is_finalized = true;
|
||||
}
|
||||
|
||||
/// The block this extrinsic was included in has been retracted
|
||||
pub fn retracted(&mut self, hash: BH) {
|
||||
self.send(TransactionStatus::Retracted(hash));
|
||||
}
|
||||
|
||||
/// Extrinsic has been marked as invalid by the block builder.
|
||||
pub fn invalid(&mut self) {
|
||||
self.send(TransactionStatus::Invalid);
|
||||
// we mark as finalized as there are no more notifications
|
||||
self.finalized = true;
|
||||
self.is_finalized = true;
|
||||
}
|
||||
|
||||
/// Transaction has been dropped from the pool because of the limit.
|
||||
pub fn dropped(&mut self) {
|
||||
self.send(TransactionStatus::Dropped);
|
||||
self.finalized = true;
|
||||
self.is_finalized = true;
|
||||
}
|
||||
|
||||
/// The extrinsic has been broadcast to the given peers.
|
||||
@@ -114,10 +130,10 @@ impl<H: Clone, H2: Clone> Sender<H, H2> {
|
||||
|
||||
/// Returns true if the are no more listeners for this extrinsic or it was finalized.
|
||||
pub fn is_done(&self) -> bool {
|
||||
self.finalized || self.receivers.is_empty()
|
||||
self.is_finalized || self.receivers.is_empty()
|
||||
}
|
||||
|
||||
fn send(&mut self, status: TransactionStatus<H, H2>) {
|
||||
fn send(&mut self, status: TransactionStatus<H, BH>) {
|
||||
self.receivers.retain(|sender| sender.unbounded_send(status.clone()).is_ok())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user