Longevity handling. (#903)

This commit is contained in:
Tomasz Drwięga
2018-10-16 10:04:19 +02:00
committed by Gav Wood
parent 8050979660
commit a24e61cb29
9 changed files with 152 additions and 175 deletions
@@ -47,7 +47,7 @@ pub type ExtrinsicFor<A> = <<A as ChainApi>::Block as traits::Block>::Extrinsic;
/// Block number type for the ChainApi
pub type NumberFor<A> = traits::NumberFor<<A as ChainApi>::Block>;
/// A type of transaction stored in the pool
pub type TransactionFor<A> = Arc<base::Transaction<ExHash<A>, TxData<ExtrinsicFor<A>>>>;
pub type TransactionFor<A> = Arc<base::Transaction<ExHash<A>, ExtrinsicFor<A>>>;
/// Concrete extrinsic validation and query logic.
pub trait ChainApi: Send + Sync {
@@ -71,22 +71,6 @@ pub trait ChainApi: Send + Sync {
fn hash(&self, uxt: &ExtrinsicFor<Self>) -> Self::Hash;
}
/// Maximum time the transaction will be kept in the pool.
///
/// Transactions that don't get included within the limit are removed from the pool.
const POOL_TIME: time::Duration = time::Duration::from_secs(60 * 5);
/// Additional transaction data
#[derive(Debug, Serialize, Deserialize)]
pub struct TxData<Ex> {
/// Raw data stored by the user.
pub raw: Ex,
/// Transaction validity deadline.
/// TODO [ToDr] Should we use longevity instead?
#[serde(skip)]
pub valid_till: Option<time::Instant>,
}
/// Pool configuration options.
#[derive(Debug, Clone, Default)]
pub struct Options;
@@ -97,7 +81,7 @@ pub struct Pool<B: ChainApi> {
listener: RwLock<Listener<ExHash<B>, BlockHash<B>>>,
pool: RwLock<base::BasePool<
ExHash<B>,
TxData<ExtrinsicFor<B>>,
ExtrinsicFor<B>,
>>,
import_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<()>>>,
rotator: PoolRotator<ExHash<B>>,
@@ -121,17 +105,14 @@ impl<B: ChainApi> Pool<B> {
}
match self.api.validate_transaction(at, &xt)? {
TransactionValidity::Valid(priority, requires, provides, longevity)=> {
TransactionValidity::Valid(priority, requires, provides, longevity) => {
Ok(base::Transaction {
data: TxData {
raw: xt,
valid_till: Some(time::Instant::now() + POOL_TIME),
},
data: xt,
hash,
priority,
requires,
provides,
longevity,
valid_till: block_number.as_().saturating_add(longevity),
})
},
TransactionValidity::Invalid => {
@@ -144,7 +125,7 @@ impl<B: ChainApi> Pool<B> {
}
})
.map(|tx| {
let imported = self.pool.write().import(block_number.as_(), tx?)?;
let imported = self.pool.write().import(tx?)?;
self.import_notification_sinks.lock().retain(|sink| sink.unbounded_send(()).is_ok());
@@ -168,10 +149,7 @@ impl<B: ChainApi> Pool<B> {
/// Prunes ready transactions that provide given list of tags.
pub fn prune_tags(&self, at: &BlockId<B::Block>, tags: impl IntoIterator<Item=Tag>) -> Result<(), B::Error> {
let block_number = self.api.block_id_to_number(at)?
.ok_or_else(|| error::ErrorKind::Msg(format!("Invalid block id: {:?}", at)).into())?;
let status = self.pool.write().prune_tags(block_number.as_(), tags);
let status = self.pool.write().prune_tags(tags);
{
let mut listener = self.listener.write();
for promoted in &status.promoted {
@@ -183,7 +161,7 @@ impl<B: ChainApi> Pool<B> {
}
// try to re-submit pruned transactions since some of them might be still valid.
let hashes = status.pruned.iter().map(|tx| tx.hash.clone()).collect::<Vec<_>>();
let results = self.submit_at(at, status.pruned.into_iter().map(|tx| tx.data.raw.clone()))?;
let results = self.submit_at(at, status.pruned.into_iter().map(|tx| tx.data.clone()))?;
// Fire mined event for transactions that became invalid.
let hashes = results.into_iter().enumerate().filter_map(|(idx, r)| match r.map_err(error::IntoPoolError::into_pool_error) {
Err(Ok(err)) => match err.kind() {
@@ -210,15 +188,29 @@ impl<B: ChainApi> Pool<B> {
/// Stale transactions are transaction beyond their longevity period.
/// Note this function does not remove transactions that are already included in the chain.
/// See `prune_tags` ifyou want this.
pub fn clear_stale(&self, _at: &BlockId<B::Block>) -> Result<(), B::Error> {
pub fn clear_stale(&self, at: &BlockId<B::Block>) -> Result<(), B::Error> {
let block_number = self.api.block_id_to_number(at)?
.ok_or_else(|| error::ErrorKind::Msg(format!("Invalid block id: {:?}", at)).into())?
.as_();
let now = time::Instant::now();
let to_remove = self.ready(|pending| pending
.filter(|tx| self.rotator.ban_if_stale(&now, &tx))
.filter(|tx| self.rotator.ban_if_stale(&now, block_number, &tx))
.map(|tx| tx.hash.clone())
.collect::<Vec<_>>()
);
let futures_to_remove: Vec<ExHash<B>> = {
let p = self.pool.read();
let mut hashes = Vec::new();
for tx in p.futures() {
if self.rotator.ban_if_stale(&now, block_number, &tx) {
hashes.push(tx.hash.clone());
}
}
hashes
};
// removing old transactions
self.remove_invalid(&to_remove);
self.remove_invalid(&futures_to_remove);
// clear banned transactions timeouts
self.rotator.clear_timeouts(&now);
@@ -283,7 +275,7 @@ impl<B: ChainApi> Pool<B> {
///
/// Be careful with large limit values, as querying the entire pool might be time consuming.
pub fn all(&self, limit: usize) -> Vec<ExtrinsicFor<B>> {
self.ready(|it| it.take(limit).map(|ex| ex.data.raw.clone()).collect())
self.ready(|it| it.take(limit).map(|ex| ex.data.clone()).collect())
}
/// Returns pool status.