prune finalized transactions from the pool (#127)

This commit is contained in:
Robert Habermeier
2018-04-15 12:53:53 +02:00
committed by Gav Wood
parent 6e945e40ee
commit 2a53d414a3
8 changed files with 52 additions and 48 deletions
+24 -5
View File
@@ -53,8 +53,8 @@ use futures::prelude::*;
use parking_lot::Mutex;
use tokio_core::reactor::Core;
use codec::Slicable;
use runtime_io::with_externalities;
use primitives::block::{Id as BlockId, TransactionHash};
use primitives::block::{Id as BlockId, Extrinsic, ExtrinsicHash, HeaderHash};
use primitives::hashing;
use transaction_pool::TransactionPool;
use substrate_executor::NativeExecutor;
use polkadot_executor::Executor as LocalDispatch;
@@ -71,7 +71,6 @@ pub use config::{Configuration, Role, ChainSpec};
type Client = client::Client<InMemory, NativeExecutor<LocalDispatch>>;
/// Polkadot service.
pub struct Service {
thread: Option<thread::JoinHandle<()>>,
@@ -87,7 +86,7 @@ struct TransactionPoolAdapter {
}
impl network::TransactionPool for TransactionPoolAdapter {
fn transactions(&self) -> Vec<(TransactionHash, Vec<u8>)> {
fn transactions(&self) -> Vec<(ExtrinsicHash, Vec<u8>)> {
let best_block = match self.client.info() {
Ok(info) => info.chain.best_hash,
Err(e) => {
@@ -104,7 +103,7 @@ impl network::TransactionPool for TransactionPoolAdapter {
}).collect()
}
fn import(&self, transaction: &[u8]) -> Option<TransactionHash> {
fn import(&self, transaction: &[u8]) -> Option<ExtrinsicHash> {
if let Some(tx) = codec::Slicable::decode(&mut &transaction[..]) {
match self.pool.lock().import(tx) {
Ok(t) => Some(t.hash()[..].into()),
@@ -299,11 +298,14 @@ impl Service {
let thread_client = client.clone();
let thread_network = network.clone();
let thread_txpool = transaction_pool.clone();
let thread = thread::spawn(move || {
thread_network.start_network();
let mut core = Core::new().expect("tokio::Core could not be created");
let events = thread_client.import_notification_stream().for_each(|notification| {
thread_network.on_block_imported(notification.hash, &notification.header);
prune_imported(&*thread_client, &*thread_txpool, notification.hash);
Ok(())
});
if let Err(e) = core.run(events) {
@@ -336,6 +338,23 @@ impl Service {
}
}
fn prune_transactions(pool: &mut TransactionPool, extrinsics: &[Extrinsic]) {
for extrinsic in extrinsics {
let hash: _ = hashing::blake2_256(&extrinsic.encode()).into();
pool.remove(&hash, true);
}
}
/// Produce a task which prunes any finalized transactions from the pool.
pub fn prune_imported(client: &Client, pool: &Mutex<TransactionPool>, hash: HeaderHash) {
let id = BlockId::Hash(hash);
match client.body(&id) {
Ok(Some(body)) => prune_transactions(&mut *pool.lock(), &body[..]),
Ok(None) => warn!("Missing imported block {:?}", hash),
Err(e) => warn!("Failed to fetch block: {:?}", e),
}
}
impl Drop for Service {
fn drop(&mut self) {
self.client.stop_notifications();
@@ -20,7 +20,7 @@ use std::vec::Vec;
use codec::{Joiner, Slicable};
use state_machine::{self, CodeExecutor};
use primitives::{Header, Block};
use primitives::block::{Id as BlockId, Transaction};
use primitives::block::{Id as BlockId, Extrinsic};
use {backend, error, Client};
use triehash::ordered_trie_root;
@@ -31,7 +31,7 @@ pub struct BlockBuilder<B, E> where
error::Error: From<<<B as backend::Backend>::State as state_machine::backend::Backend>::Error>,
{
header: Header,
transactions: Vec<Transaction>,
transactions: Vec<Extrinsic>,
executor: E,
state: B::State,
changes: state_machine::OverlayedChanges,
@@ -68,7 +68,7 @@ impl<B, E> BlockBuilder<B, E> where
/// Push a transaction onto the block's list of transactions. This will ensure the transaction
/// can be validly executed (by executing it); if it is invalid, it'll be returned along with
/// the error. Otherwise, it will return a mutable reference to self (in order to chain).
pub fn push(&mut self, tx: Transaction) -> error::Result<()> {
pub fn push(&mut self, tx: Extrinsic) -> error::Result<()> {
let output = state_machine::execute(&self.state, &mut self.changes, &self.executor, "execute_transaction",
&vec![].and(&self.header).and(&tx))?;
self.header = Header::decode(&mut &output[..]).expect("Header came straight out of runtime so must be valid");
+4 -4
View File
@@ -453,7 +453,7 @@ mod tests {
use codec::Slicable;
use keyring::Keyring;
use {primitives, genesis};
use primitives::block::Transaction as PrimitiveTransaction;
use primitives::block::Extrinsic as PrimitiveExtrinsic;
use test_runtime::genesismap::{GenesisConfig, additional_storage_with_genesis};
use test_runtime::{UncheckedTransaction, Transaction};
use test_runtime;
@@ -559,12 +559,12 @@ mod tests {
}
trait Signable {
fn signed(self) -> PrimitiveTransaction;
fn signed(self) -> PrimitiveExtrinsic;
}
impl Signable for Transaction {
fn signed(self) -> PrimitiveTransaction {
fn signed(self) -> PrimitiveExtrinsic {
let signature = Keyring::from_raw_public(self.from.clone()).unwrap().sign(&self.encode());
PrimitiveTransaction::decode(&mut UncheckedTransaction { signature, tx: self }.encode().as_ref()).unwrap()
PrimitiveExtrinsic::decode(&mut UncheckedTransaction { signature, tx: self }.encode().as_ref()).unwrap()
}
}
+4 -4
View File
@@ -21,7 +21,7 @@ use std::time;
use parking_lot::{RwLock, Mutex};
use futures::sync::oneshot;
use serde_json;
use primitives::block::{HeaderHash, TransactionHash, Number as BlockNumber, Header, Id as BlockId};
use primitives::block::{HeaderHash, ExtrinsicHash, Number as BlockNumber, Header, Id as BlockId};
use primitives::{Hash, blake2_256};
use runtime_support::Hashable;
use network::{PeerId, NodeId};
@@ -82,7 +82,7 @@ struct Peer {
/// Request timestamp
request_timestamp: Option<time::Instant>,
/// Holds a set of transactions known to this peer.
known_transactions: HashSet<TransactionHash>,
known_transactions: HashSet<ExtrinsicHash>,
/// Holds a set of blocks known to this peer.
known_blocks: HashSet<HeaderHash>,
/// Request counter,
@@ -443,7 +443,7 @@ impl Protocol {
}
/// Called when peer sends us new transactions
pub fn propagate_transactions(&self, io: &mut SyncIo, transactions: &[(TransactionHash, Vec<u8>)]) {
pub fn propagate_transactions(&self, io: &mut SyncIo, transactions: &[(ExtrinsicHash, Vec<u8>)]) {
// Accept transactions only when fully synced
if self.sync.read().status().state != SyncState::Idle {
return;
@@ -513,7 +513,7 @@ impl Protocol {
}
}
pub fn transactions_stats(&self) -> BTreeMap<TransactionHash, TransactionStats> {
pub fn transactions_stats(&self) -> BTreeMap<ExtrinsicHash, TransactionStats> {
BTreeMap::new()
}
+6 -6
View File
@@ -21,7 +21,7 @@ use futures::sync::{oneshot, mpsc};
use network::{NetworkProtocolHandler, NetworkContext, HostInfo, PeerId, ProtocolId,
NetworkConfiguration , NonReservedPeerMode, ErrorKind};
use network_devp2p::{NetworkService};
use primitives::block::{TransactionHash, Header, HeaderHash};
use primitives::block::{ExtrinsicHash, Header, HeaderHash};
use primitives::Hash;
use core_io::{TimerToken};
use io::NetSyncIo;
@@ -66,15 +66,15 @@ pub trait SyncProvider: Send + Sync {
/// Get this node id if available.
fn node_id(&self) -> Option<String>;
/// Returns propagation count for pending transactions.
fn transactions_stats(&self) -> BTreeMap<TransactionHash, TransactionStats>;
fn transactions_stats(&self) -> BTreeMap<ExtrinsicHash, TransactionStats>;
}
/// Transaction pool interface
pub trait TransactionPool: Send + Sync {
/// Get transactions from the pool that are ready to be propagated.
fn transactions(&self) -> Vec<(TransactionHash, Vec<u8>)>;
fn transactions(&self) -> Vec<(ExtrinsicHash, Vec<u8>)>;
/// Import a transction into the pool.
fn import(&self, transaction: &[u8]) -> Option<TransactionHash>;
fn import(&self, transaction: &[u8]) -> Option<ExtrinsicHash>;
}
/// ConsensusService
@@ -161,7 +161,7 @@ impl Service {
}
/// Called when new transactons are imported by the client.
pub fn on_new_transactions(&self, transactions: &[(TransactionHash, Vec<u8>)]) {
pub fn on_new_transactions(&self, transactions: &[(ExtrinsicHash, Vec<u8>)]) {
self.network.with_context(DOT_PROTOCOL_ID, |context| {
self.handler.protocol.propagate_transactions(&mut NetSyncIo::new(context), transactions);
});
@@ -223,7 +223,7 @@ impl SyncProvider for Service {
self.network.external_url()
}
fn transactions_stats(&self) -> BTreeMap<TransactionHash, TransactionStats> {
fn transactions_stats(&self) -> BTreeMap<ExtrinsicHash, TransactionStats> {
self.handler.protocol.transactions_stats()
}
}
+4 -4
View File
@@ -21,7 +21,7 @@ use std::sync::Arc;
use parking_lot::RwLock;
use client::{self, genesis, BlockOrigin};
use client::block_builder::BlockBuilder;
use primitives::block::{Id as BlockId, TransactionHash};
use primitives::block::{Id as BlockId, ExtrinsicHash};
use primitives;
use executor;
use io::SyncIo;
@@ -206,7 +206,7 @@ impl Peer {
nonce: nonce,
};
let signature = Keyring::from_raw_public(tx.from.clone()).unwrap().sign(&tx.encode());
let tx = primitives::block::Transaction::decode(&mut test_runtime::UncheckedTransaction { signature, tx: tx }.encode().as_ref()).unwrap();
let tx = primitives::block::Extrinsic::decode(&mut test_runtime::UncheckedTransaction { signature, tx: tx }.encode().as_ref()).unwrap();
builder.push(tx).unwrap();
nonce = nonce + 1;
});
@@ -219,11 +219,11 @@ impl Peer {
struct EmptyTransactionPool;
impl TransactionPool for EmptyTransactionPool {
fn transactions(&self) -> Vec<(TransactionHash, Vec<u8>)> {
fn transactions(&self) -> Vec<(ExtrinsicHash, Vec<u8>)> {
Vec::new()
}
fn import(&self, _transaction: &[u8]) -> Option<TransactionHash> {
fn import(&self, _transaction: &[u8]) -> Option<ExtrinsicHash> {
None
}
}
+6 -21
View File
@@ -29,23 +29,8 @@ pub type Number = u64;
/// Hash used to refer to a block hash.
pub type HeaderHash = Hash;
/// Hash used to refer to a transaction hash.
pub type TransactionHash = Hash;
/// Simple generic transaction type.
#[derive(PartialEq, Eq, Clone)]
#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug))]
pub struct Transaction(#[cfg_attr(feature = "std", serde(with="bytes"))] pub Vec<u8>);
impl Slicable for Transaction {
fn decode<I: Input>(input: &mut I) -> Option<Self> {
Vec::<u8>::decode(input).map(Transaction)
}
fn using_encoded<R, F: FnOnce(&[u8]) -> R>(&self, f: F) -> R {
self.0.using_encoded(f)
}
}
/// Hash used to refer to an extrinsic.
pub type ExtrinsicHash = Hash;
/// Simple generic extrinsic type.
#[derive(PartialEq, Eq, Clone)]
@@ -126,11 +111,11 @@ pub mod generic {
}
}
/// The body of a block is just a bunch of transactions.
pub type Body = Vec<Transaction>;
/// The body of a block is just a bunch of extrinsics.
pub type Body = Vec<Extrinsic>;
/// The header and body of a concrete, but unspecialised, block. Used by substrate to represent a
/// block some fields of which the runtime alone knows how to interpret (e.g. the transactions).
pub type Block = generic::Block<Transaction>;
pub type Block = generic::Block<Extrinsic>;
/// A substrate chain block header.
// TODO: split out into light-client-specific fields and runtime-specific fields.
@@ -270,7 +255,7 @@ mod tests {
fn test_block_encoding() {
let block = Block {
header: Header::from_block_number(12),
transactions: vec![Transaction(vec!(4))],
transactions: vec![Extrinsic(vec!(4))],
};
assert_eq!(block.encode(), vec![
@@ -223,7 +223,7 @@ pub trait Block {
}
impl Block for substrate_primitives::Block {
type Extrinsic = substrate_primitives::block::Transaction;
type Extrinsic = substrate_primitives::block::Extrinsic;
type Header = substrate_primitives::Header;
fn header(&self) -> &Self::Header {
&self.header