mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-11 13:01:07 +00:00
Add transactions from retracted blocks back to the pool (#3562)
* Add transactions from retracted blocks back to the pool * Line width * Reverse retracted
This commit is contained in:
committed by
Gavin Wood
parent
a381f033bd
commit
0df2be5760
@@ -250,7 +250,7 @@ mod tests {
|
||||
let chain_api = transaction_pool::ChainApi::new(client.clone());
|
||||
let txpool = Arc::new(TransactionPool::new(Default::default(), chain_api));
|
||||
|
||||
txpool.submit_at(&BlockId::number(0), vec![extrinsic(0), extrinsic(1)]).unwrap();
|
||||
txpool.submit_at(&BlockId::number(0), vec![extrinsic(0), extrinsic(1)], false).unwrap();
|
||||
|
||||
let mut proposer_factory = ProposerFactory {
|
||||
client: client.clone(),
|
||||
|
||||
@@ -35,6 +35,15 @@ pub type StorageCollection = Vec<(Vec<u8>, Option<Vec<u8>>)>;
|
||||
/// In memory arrays of storage values for multiple child tries.
|
||||
pub type ChildStorageCollection = Vec<(Vec<u8>, StorageCollection)>;
|
||||
|
||||
pub(crate) struct ImportSummary<Block: BlockT> {
|
||||
pub(crate) hash: Block::Hash,
|
||||
pub(crate) origin: BlockOrigin,
|
||||
pub(crate) header: Block::Header,
|
||||
pub(crate) is_new_best: bool,
|
||||
pub(crate) storage_changes: Option<(StorageCollection, ChildStorageCollection)>,
|
||||
pub(crate) retracted: Vec<Block::Hash>,
|
||||
}
|
||||
|
||||
/// Import operation wrapper
|
||||
pub struct ClientImportOperation<
|
||||
Block: BlockT,
|
||||
@@ -42,15 +51,7 @@ pub struct ClientImportOperation<
|
||||
B: Backend<Block, H>,
|
||||
> {
|
||||
pub(crate) op: B::BlockImportOperation,
|
||||
pub(crate) notify_imported: Option<(
|
||||
Block::Hash,
|
||||
BlockOrigin,
|
||||
Block::Header,
|
||||
bool,
|
||||
Option<(
|
||||
StorageCollection,
|
||||
ChildStorageCollection,
|
||||
)>)>,
|
||||
pub(crate) notify_imported: Option<ImportSummary<Block>>,
|
||||
pub(crate) notify_finalized: Vec<Block::Hash>,
|
||||
}
|
||||
|
||||
|
||||
@@ -62,7 +62,7 @@ use crate::{
|
||||
},
|
||||
backend::{
|
||||
self, BlockImportOperation, PrunableStateChangesTrieStorage,
|
||||
ClientImportOperation, Finalizer,
|
||||
ClientImportOperation, Finalizer, ImportSummary,
|
||||
},
|
||||
blockchain::{
|
||||
self, Info as ChainInfo, Backend as ChainBackend,
|
||||
@@ -199,6 +199,8 @@ pub struct BlockImportNotification<Block: BlockT> {
|
||||
pub header: Block::Header,
|
||||
/// Is this the new best block.
|
||||
pub is_new_best: bool,
|
||||
/// List of retracted blocks ordered by block number.
|
||||
pub retracted: Vec<Block::Hash>,
|
||||
}
|
||||
|
||||
/// Summary of a finalized block.
|
||||
@@ -968,6 +970,17 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
|
||||
crate::backend::NewBlockState::Normal
|
||||
};
|
||||
|
||||
let retracted = if is_new_best {
|
||||
let route_from_best = crate::blockchain::tree_route(
|
||||
|id| self.header(&id)?.ok_or_else(|| Error::UnknownBlock(format!("{:?}", id))),
|
||||
BlockId::Hash(info.best_hash),
|
||||
BlockId::Hash(parent_hash),
|
||||
)?;
|
||||
route_from_best.retracted().iter().rev().map(|e| e.hash.clone()).collect()
|
||||
} else {
|
||||
Vec::default()
|
||||
};
|
||||
|
||||
trace!("Imported {}, (#{}), best={}, origin={:?}", hash, import_headers.post().number(), is_new_best, origin);
|
||||
|
||||
operation.op.set_block_data(
|
||||
@@ -995,7 +1008,14 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
|
||||
operation.notify_finalized.push(hash);
|
||||
}
|
||||
|
||||
operation.notify_imported = Some((hash, origin, import_headers.into_post(), is_new_best, storage_changes));
|
||||
operation.notify_imported = Some(ImportSummary {
|
||||
hash,
|
||||
origin,
|
||||
header: import_headers.into_post(),
|
||||
is_new_best,
|
||||
storage_changes,
|
||||
retracted,
|
||||
})
|
||||
}
|
||||
|
||||
Ok(ImportResult::imported(is_new_best))
|
||||
@@ -1167,33 +1187,24 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
|
||||
|
||||
fn notify_imported(
|
||||
&self,
|
||||
notify_import: (
|
||||
Block::Hash, BlockOrigin,
|
||||
Block::Header,
|
||||
bool,
|
||||
Option<(
|
||||
Vec<(Vec<u8>, Option<Vec<u8>>)>,
|
||||
Vec<(Vec<u8>, Vec<(Vec<u8>, Option<Vec<u8>>)>)>,
|
||||
)
|
||||
>),
|
||||
notify_import: ImportSummary<Block>,
|
||||
) -> error::Result<()> {
|
||||
let (hash, origin, header, is_new_best, storage_changes) = notify_import;
|
||||
|
||||
if let Some(storage_changes) = storage_changes {
|
||||
if let Some(storage_changes) = notify_import.storage_changes {
|
||||
// TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes?
|
||||
self.storage_notifications.lock()
|
||||
.trigger(
|
||||
&hash,
|
||||
¬ify_import.hash,
|
||||
storage_changes.0.into_iter(),
|
||||
storage_changes.1.into_iter().map(|(sk, v)| (sk, v.into_iter())),
|
||||
);
|
||||
}
|
||||
|
||||
let notification = BlockImportNotification::<Block> {
|
||||
hash,
|
||||
origin,
|
||||
header,
|
||||
is_new_best,
|
||||
hash: notify_import.hash,
|
||||
origin: notify_import.origin,
|
||||
header: notify_import.header,
|
||||
is_new_best: notify_import.is_new_best,
|
||||
retracted: notify_import.retracted,
|
||||
};
|
||||
|
||||
self.import_notification_sinks.lock()
|
||||
|
||||
@@ -472,6 +472,7 @@ mod tests {
|
||||
origin: BlockOrigin::File,
|
||||
header,
|
||||
is_new_best: false,
|
||||
retracted: vec![],
|
||||
}).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -871,7 +871,7 @@ ServiceBuilder<
|
||||
dht_event_tx,
|
||||
))
|
||||
},
|
||||
|h, c, tx| maintain_transaction_pool(h, c, tx),
|
||||
|h, c, tx, r| maintain_transaction_pool(h, c, tx, r),
|
||||
|n, o, p, ns, v| offchain_workers(n, o, p, ns, v),
|
||||
|c, ssb, si, te, tp, ext, ks| start_rpc(&rpc_builder, c, ssb, si, te, tp, ext, ks),
|
||||
)
|
||||
@@ -924,6 +924,7 @@ pub(crate) fn maintain_transaction_pool<Api, Backend, Block, Executor, PoolApi>(
|
||||
id: &BlockId<Block>,
|
||||
client: &Client<Backend, Executor, Block, Api>,
|
||||
transaction_pool: &TransactionPool<PoolApi>,
|
||||
retracted: &[Block::Hash],
|
||||
) -> error::Result<()> where
|
||||
Block: BlockT<Hash = <Blake2Hasher as primitives::Hasher>::Out>,
|
||||
Backend: client::backend::Backend<Block, Blake2Hasher>,
|
||||
@@ -932,6 +933,16 @@ pub(crate) fn maintain_transaction_pool<Api, Backend, Block, Executor, PoolApi>(
|
||||
Executor: client::CallExecutor<Block, Blake2Hasher>,
|
||||
PoolApi: txpool::ChainApi<Hash = Block::Hash, Block = Block>,
|
||||
{
|
||||
// Put transactions from retracted blocks back into the pool.
|
||||
for r in retracted {
|
||||
if let Some(block) = client.block(&BlockId::hash(*r))? {
|
||||
let extrinsics = block.block.extrinsics();
|
||||
if let Err(e) = transaction_pool.submit_at(id, extrinsics.iter().cloned(), true) {
|
||||
warn!("Error re-submitting transactions: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Avoid calling into runtime if there is nothing to prune from the pool anyway.
|
||||
if transaction_pool.status().is_empty() {
|
||||
return Ok(())
|
||||
@@ -1007,10 +1018,67 @@ mod tests {
|
||||
&id,
|
||||
&client,
|
||||
&pool,
|
||||
&[]
|
||||
).unwrap();
|
||||
|
||||
// then
|
||||
assert_eq!(pool.status().ready, 0);
|
||||
assert_eq!(pool.status().future, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn should_add_reverted_transactions_to_the_pool() {
|
||||
let (client, longest_chain) = TestClientBuilder::new().build_with_longest_chain();
|
||||
let client = Arc::new(client);
|
||||
let pool = TransactionPool::new(Default::default(), ::transaction_pool::ChainApi::new(client.clone()));
|
||||
let transaction = Transfer {
|
||||
amount: 5,
|
||||
nonce: 0,
|
||||
from: AccountKeyring::Alice.into(),
|
||||
to: Default::default(),
|
||||
}.into_signed_tx();
|
||||
let best = longest_chain.best_chain().unwrap();
|
||||
|
||||
// store the transaction in the pool
|
||||
pool.submit_one(&BlockId::hash(best.hash()), transaction.clone()).unwrap();
|
||||
|
||||
// import the block
|
||||
let mut builder = client.new_block(Default::default()).unwrap();
|
||||
builder.push(transaction.clone()).unwrap();
|
||||
let block = builder.bake().unwrap();
|
||||
let block1_hash = block.header().hash();
|
||||
let id = BlockId::hash(block1_hash.clone());
|
||||
client.import(BlockOrigin::Own, block).unwrap();
|
||||
|
||||
// fire notification - this should clean up the queue
|
||||
assert_eq!(pool.status().ready, 1);
|
||||
maintain_transaction_pool(
|
||||
&id,
|
||||
&client,
|
||||
&pool,
|
||||
&[]
|
||||
).unwrap();
|
||||
|
||||
// then
|
||||
assert_eq!(pool.status().ready, 0);
|
||||
assert_eq!(pool.status().future, 0);
|
||||
|
||||
// import second block
|
||||
let builder = client.new_block_at(&BlockId::hash(best.hash()), Default::default()).unwrap();
|
||||
let block = builder.bake().unwrap();
|
||||
let id = BlockId::hash(block.header().hash());
|
||||
client.import(BlockOrigin::Own, block).unwrap();
|
||||
|
||||
// fire notification - this should add the transaction back to the pool.
|
||||
maintain_transaction_pool(
|
||||
&id,
|
||||
&client,
|
||||
&pool,
|
||||
&[block1_hash]
|
||||
).unwrap();
|
||||
|
||||
// then
|
||||
assert_eq!(pool.status().ready, 1);
|
||||
assert_eq!(pool.status().future, 0);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -238,6 +238,7 @@ macro_rules! new_impl {
|
||||
&BlockId::hash(notification.hash),
|
||||
&*client,
|
||||
&*txpool,
|
||||
¬ification.retracted,
|
||||
).map_err(|e| warn!("Pool error processing new block: {:?}", e))?;
|
||||
}
|
||||
|
||||
|
||||
@@ -114,7 +114,9 @@ pub struct Pool<B: ChainApi> {
|
||||
|
||||
impl<B: ChainApi> Pool<B> {
|
||||
/// Imports a bunch of unverified extrinsics to the pool
|
||||
pub fn submit_at<T>(&self, at: &BlockId<B::Block>, xts: T) -> Result<Vec<Result<ExHash<B>, B::Error>>, B::Error> where
|
||||
pub fn submit_at<T>(&self, at: &BlockId<B::Block>, xts: T, force: bool)
|
||||
-> Result<Vec<Result<ExHash<B>, B::Error>>, B::Error>
|
||||
where
|
||||
T: IntoIterator<Item=ExtrinsicFor<B>>
|
||||
{
|
||||
let block_number = self.api.block_id_to_number(at)?
|
||||
@@ -124,7 +126,7 @@ impl<B: ChainApi> Pool<B> {
|
||||
.into_iter()
|
||||
.map(|xt| -> Result<_, B::Error> {
|
||||
let (hash, bytes) = self.api.hash_and_length(&xt);
|
||||
if self.rotator.is_banned(&hash) {
|
||||
if !force && self.rotator.is_banned(&hash) {
|
||||
return Err(error::Error::TemporarilyBanned.into())
|
||||
}
|
||||
|
||||
@@ -207,7 +209,7 @@ impl<B: ChainApi> Pool<B> {
|
||||
|
||||
/// Imports one unverified extrinsic to the pool
|
||||
pub fn submit_one(&self, at: &BlockId<B::Block>, xt: ExtrinsicFor<B>) -> Result<ExHash<B>, B::Error> {
|
||||
Ok(self.submit_at(at, ::std::iter::once(xt))?.pop().expect("One extrinsic passed; one result returned; qed")?)
|
||||
Ok(self.submit_at(at, ::std::iter::once(xt), false)?.pop().expect("One extrinsic passed; one result returned; qed")?)
|
||||
}
|
||||
|
||||
/// Import a single extrinsic and starts to watch their progress in the pool.
|
||||
@@ -306,7 +308,7 @@ impl<B: ChainApi> Pool<B> {
|
||||
// try to re-submit pruned transactions since some of them might be still valid.
|
||||
// note that `known_imported_hashes` will be rejected here due to temporary ban.
|
||||
let hashes = status.pruned.iter().map(|tx| tx.hash.clone()).collect::<Vec<_>>();
|
||||
let results = self.submit_at(at, status.pruned.into_iter().map(|tx| tx.data.clone()))?;
|
||||
let results = self.submit_at(at, status.pruned.into_iter().map(|tx| tx.data.clone()), false)?;
|
||||
|
||||
// Collect the hashes of transactions that now became invalid (meaning that they are successfully pruned).
|
||||
let hashes = results.into_iter().enumerate().filter_map(|(idx, r)| match r.map_err(error::IntoPoolError::into_pool_error) {
|
||||
|
||||
Reference in New Issue
Block a user