Extrinsic pool (#182)

* Use latest version of txpool.

* Initial version of the pool.

* Fix abstraction.

* Implement watchers and notifications.

* Return hash from RPC.

* Remove commented code.

* Remove client dep.

* Fix tests.
This commit is contained in:
Tomasz Drwięga
2018-05-31 22:49:17 +02:00
committed by Gav Wood
parent 44eaa4a180
commit 1dada4f7a0
27 changed files with 770 additions and 315 deletions
+29 -35
View File
@@ -78,7 +78,6 @@ use tokio_core::reactor::{Handle, Timeout, Interval};
use futures::prelude::*;
use futures::future::{self, Shared};
use parking_lot::Mutex;
use collation::CollationFetch;
use dynamic_inclusion::DynamicInclusion;
@@ -226,7 +225,7 @@ pub struct ProposerFactory<C, N, P> {
/// The client instance.
pub client: Arc<C>,
/// The transaction pool.
pub transaction_pool: Arc<Mutex<TransactionPool>>,
pub transaction_pool: Arc<TransactionPool>,
/// The backing network handle.
pub network: N,
/// Parachain collators.
@@ -319,7 +318,7 @@ pub struct Proposer<C: PolkadotApi, R, P> {
random_seed: Hash,
router: R,
table: Arc<SharedTable>,
transaction_pool: Arc<Mutex<TransactionPool>>,
transaction_pool: Arc<TransactionPool>,
}
impl<C, R, P> bft::Proposer for Proposer<C, R, P>
@@ -495,17 +494,16 @@ impl<C, R, P> bft::Proposer for Proposer<C, R, P>
use primitives::bft::{MisbehaviorKind, MisbehaviorReport};
use polkadot_runtime::{Call, Extrinsic, UncheckedExtrinsic, ConsensusCall};
let local_id = self.local_key.public().0;
let mut pool = self.transaction_pool.lock();
let mut next_index = {
let readiness_evaluator = Ready::create(self.parent_id.clone(), &*self.client);
pool.cull(None, readiness_evaluator.clone());
let cur_index = pool.pending(readiness_evaluator)
let cur_index = self.transaction_pool.cull_and_get_pending(readiness_evaluator, |pending| pending
.filter(|tx| tx.as_ref().as_ref().signed == local_id)
.last()
.map(|tx| Ok(tx.as_ref().as_ref().index))
.unwrap_or_else(|| self.client.index(&self.parent_id, local_id));
.unwrap_or_else(|| self.client.index(&self.parent_id, local_id))
);
match cur_index {
Ok(cur_index) => cur_index + 1,
@@ -541,7 +539,7 @@ impl<C, R, P> bft::Proposer for Proposer<C, R, P>
let signature = self.local_key.sign(&extrinsic.encode()).into();
let uxt = UncheckedExtrinsic { extrinsic, signature };
pool.import(uxt).expect("locally signed extrinsic is valid; qed");
self.transaction_pool.import_unchecked_extrinsic(uxt).expect("locally signed extrinsic is valid; qed");
}
}
}
@@ -609,7 +607,7 @@ pub struct CreateProposal<C: PolkadotApi, R, P: Collators> {
parent_number: BlockNumber,
parent_id: C::CheckedBlockId,
client: Arc<C>,
transaction_pool: Arc<Mutex<TransactionPool>>,
transaction_pool: Arc<TransactionPool>,
collation: CollationFetch<P, C>,
router: R,
table: Arc<SharedTable>,
@@ -631,37 +629,33 @@ impl<C, R, P> CreateProposal<C, R, P>
candidates,
)?;
let readiness_evaluator = Ready::create(self.parent_id.clone(), &*self.client);
{
let mut pool = self.transaction_pool.lock();
let readiness_evaluator = Ready::create(self.parent_id.clone(), &*self.client);
let mut unqueue_invalid = Vec::new();
let mut pending_size = 0;
pool.cull(None, readiness_evaluator.clone());
for pending in pool.pending(readiness_evaluator.clone()) {
// skip and cull transactions which are too large.
if pending.encoded_size() > MAX_TRANSACTIONS_SIZE {
unqueue_invalid.push(pending.hash().clone());
continue
}
if pending_size + pending.encoded_size() >= MAX_TRANSACTIONS_SIZE { break }
match block_builder.push_extrinsic(pending.as_transaction().clone()) {
Ok(()) => {
pending_size += pending.encoded_size();
}
Err(e) => {
trace!(target: "transaction-pool", "Invalid transaction: {}", e);
self.transaction_pool.cull_and_get_pending(readiness_evaluator, |pending_iterator| {
let mut pending_size = 0;
for pending in pending_iterator {
// skip and cull transactions which are too large.
if pending.encoded_size() > MAX_TRANSACTIONS_SIZE {
unqueue_invalid.push(pending.hash().clone());
continue
}
if pending_size + pending.encoded_size() >= MAX_TRANSACTIONS_SIZE { break }
match block_builder.push_extrinsic(pending.as_transaction().clone()) {
Ok(()) => {
pending_size += pending.encoded_size();
}
Err(e) => {
trace!(target: "transaction-pool", "Invalid transaction: {}", e);
unqueue_invalid.push(pending.hash().clone());
}
}
}
}
});
for tx_hash in unqueue_invalid {
pool.remove(&tx_hash, false);
}
self.transaction_pool.remove(&unqueue_invalid, false);
}
let polkadot_block = block_builder.bake();
+1 -2
View File
@@ -28,7 +28,6 @@ use client::{BlockchainEvents, ChainHead};
use ed25519;
use futures::prelude::*;
use futures::{future, Canceled};
use parking_lot::Mutex;
use polkadot_api::LocalPolkadotApi;
use polkadot_primitives::AccountId;
use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic, CandidateReceipt};
@@ -237,7 +236,7 @@ impl Service {
client: Arc<C>,
api: Arc<A>,
network: Arc<net::ConsensusService>,
transaction_pool: Arc<Mutex<TransactionPool>>,
transaction_pool: Arc<TransactionPool>,
parachain_empty_duration: Duration,
key: ed25519::Pair,
) -> Service