Revert "prune finalized transactions from the pool"

This reverts commit 0b59c1bfb9.
This commit is contained in:
Robert Habermeier
2018-04-14 15:35:54 +02:00
parent 0b59c1bfb9
commit 6e945e40ee
8 changed files with 48 additions and 52 deletions
+5 -24
View File
@@ -53,8 +53,8 @@ use futures::prelude::*;
use parking_lot::Mutex; use parking_lot::Mutex;
use tokio_core::reactor::Core; use tokio_core::reactor::Core;
use codec::Slicable; use codec::Slicable;
use primitives::block::{Id as BlockId, Extrinsic, ExtrinsicHash, HeaderHash}; use runtime_io::with_externalities;
use primitives::hashing; use primitives::block::{Id as BlockId, TransactionHash};
use transaction_pool::TransactionPool; use transaction_pool::TransactionPool;
use substrate_executor::NativeExecutor; use substrate_executor::NativeExecutor;
use polkadot_executor::Executor as LocalDispatch; use polkadot_executor::Executor as LocalDispatch;
@@ -71,6 +71,7 @@ pub use config::{Configuration, Role, ChainSpec};
type Client = client::Client<InMemory, NativeExecutor<LocalDispatch>>; type Client = client::Client<InMemory, NativeExecutor<LocalDispatch>>;
/// Polkadot service. /// Polkadot service.
pub struct Service { pub struct Service {
thread: Option<thread::JoinHandle<()>>, thread: Option<thread::JoinHandle<()>>,
@@ -86,7 +87,7 @@ struct TransactionPoolAdapter {
} }
impl network::TransactionPool for TransactionPoolAdapter { impl network::TransactionPool for TransactionPoolAdapter {
fn transactions(&self) -> Vec<(ExtrinsicHash, Vec<u8>)> { fn transactions(&self) -> Vec<(TransactionHash, Vec<u8>)> {
let best_block = match self.client.info() { let best_block = match self.client.info() {
Ok(info) => info.chain.best_hash, Ok(info) => info.chain.best_hash,
Err(e) => { Err(e) => {
@@ -103,7 +104,7 @@ impl network::TransactionPool for TransactionPoolAdapter {
}).collect() }).collect()
} }
fn import(&self, transaction: &[u8]) -> Option<ExtrinsicHash> { fn import(&self, transaction: &[u8]) -> Option<TransactionHash> {
if let Some(tx) = codec::Slicable::decode(&mut &transaction[..]) { if let Some(tx) = codec::Slicable::decode(&mut &transaction[..]) {
match self.pool.lock().import(tx) { match self.pool.lock().import(tx) {
Ok(t) => Some(t.hash()[..].into()), Ok(t) => Some(t.hash()[..].into()),
@@ -298,14 +299,11 @@ impl Service {
let thread_client = client.clone(); let thread_client = client.clone();
let thread_network = network.clone(); let thread_network = network.clone();
let thread_txpool = transaction_pool.clone();
let thread = thread::spawn(move || { let thread = thread::spawn(move || {
thread_network.start_network(); thread_network.start_network();
let mut core = Core::new().expect("tokio::Core could not be created"); let mut core = Core::new().expect("tokio::Core could not be created");
let events = thread_client.import_notification_stream().for_each(|notification| { let events = thread_client.import_notification_stream().for_each(|notification| {
thread_network.on_block_imported(notification.hash, &notification.header); thread_network.on_block_imported(notification.hash, &notification.header);
prune_imported(&*thread_client, &*thread_txpool, notification.hash);
Ok(()) Ok(())
}); });
if let Err(e) = core.run(events) { if let Err(e) = core.run(events) {
@@ -338,23 +336,6 @@ impl Service {
} }
} }
fn prune_transactions(pool: &mut TransactionPool, extrinsics: &[Extrinsic]) {
for extrinsic in extrinsics {
let hash: _ = hashing::blake2_256(&extrinsic.encode()).into();
pool.remove(&hash, true);
}
}
/// Produce a task which prunes any finalized transactions from the pool.
pub fn prune_imported(client: &Client, pool: &Mutex<TransactionPool>, hash: HeaderHash) {
let id = BlockId::Hash(hash);
match client.body(&id) {
Ok(Some(body)) => prune_transactions(&mut *pool.lock(), &body[..]),
Ok(None) => warn!("Missing imported block {:?}", hash),
Err(e) => warn!("Failed to fetch block: {:?}", e),
}
}
impl Drop for Service { impl Drop for Service {
fn drop(&mut self) { fn drop(&mut self) {
self.client.stop_notifications(); self.client.stop_notifications();
@@ -20,7 +20,7 @@ use std::vec::Vec;
use codec::{Joiner, Slicable}; use codec::{Joiner, Slicable};
use state_machine::{self, CodeExecutor}; use state_machine::{self, CodeExecutor};
use primitives::{Header, Block}; use primitives::{Header, Block};
use primitives::block::{Id as BlockId, Extrinsic}; use primitives::block::{Id as BlockId, Transaction};
use {backend, error, Client}; use {backend, error, Client};
use triehash::ordered_trie_root; use triehash::ordered_trie_root;
@@ -31,7 +31,7 @@ pub struct BlockBuilder<B, E> where
error::Error: From<<<B as backend::Backend>::State as state_machine::backend::Backend>::Error>, error::Error: From<<<B as backend::Backend>::State as state_machine::backend::Backend>::Error>,
{ {
header: Header, header: Header,
transactions: Vec<Extrinsic>, transactions: Vec<Transaction>,
executor: E, executor: E,
state: B::State, state: B::State,
changes: state_machine::OverlayedChanges, changes: state_machine::OverlayedChanges,
@@ -68,7 +68,7 @@ impl<B, E> BlockBuilder<B, E> where
/// Push a transaction onto the block's list of transactions. This will ensure the transaction /// Push a transaction onto the block's list of transactions. This will ensure the transaction
/// can be validly executed (by executing it); if it is invalid, it'll be returned along with /// can be validly executed (by executing it); if it is invalid, it'll be returned along with
/// the error. Otherwise, it will return a mutable reference to self (in order to chain). /// the error. Otherwise, it will return a mutable reference to self (in order to chain).
pub fn push(&mut self, tx: Extrinsic) -> error::Result<()> { pub fn push(&mut self, tx: Transaction) -> error::Result<()> {
let output = state_machine::execute(&self.state, &mut self.changes, &self.executor, "execute_transaction", let output = state_machine::execute(&self.state, &mut self.changes, &self.executor, "execute_transaction",
&vec![].and(&self.header).and(&tx))?; &vec![].and(&self.header).and(&tx))?;
self.header = Header::decode(&mut &output[..]).expect("Header came straight out of runtime so must be valid"); self.header = Header::decode(&mut &output[..]).expect("Header came straight out of runtime so must be valid");
+4 -4
View File
@@ -453,7 +453,7 @@ mod tests {
use codec::Slicable; use codec::Slicable;
use keyring::Keyring; use keyring::Keyring;
use {primitives, genesis}; use {primitives, genesis};
use primitives::block::Extrinsic as PrimitiveExtrinsic; use primitives::block::Transaction as PrimitiveTransaction;
use test_runtime::genesismap::{GenesisConfig, additional_storage_with_genesis}; use test_runtime::genesismap::{GenesisConfig, additional_storage_with_genesis};
use test_runtime::{UncheckedTransaction, Transaction}; use test_runtime::{UncheckedTransaction, Transaction};
use test_runtime; use test_runtime;
@@ -559,12 +559,12 @@ mod tests {
} }
trait Signable { trait Signable {
fn signed(self) -> PrimitiveExtrinsic; fn signed(self) -> PrimitiveTransaction;
} }
impl Signable for Transaction { impl Signable for Transaction {
fn signed(self) -> PrimitiveExtrinsic { fn signed(self) -> PrimitiveTransaction {
let signature = Keyring::from_raw_public(self.from.clone()).unwrap().sign(&self.encode()); let signature = Keyring::from_raw_public(self.from.clone()).unwrap().sign(&self.encode());
PrimitiveExtrinsic::decode(&mut UncheckedTransaction { signature, tx: self }.encode().as_ref()).unwrap() PrimitiveTransaction::decode(&mut UncheckedTransaction { signature, tx: self }.encode().as_ref()).unwrap()
} }
} }
+4 -4
View File
@@ -21,7 +21,7 @@ use std::time;
use parking_lot::{RwLock, Mutex}; use parking_lot::{RwLock, Mutex};
use futures::sync::oneshot; use futures::sync::oneshot;
use serde_json; use serde_json;
use primitives::block::{HeaderHash, ExtrinsicHash, Number as BlockNumber, Header, Id as BlockId}; use primitives::block::{HeaderHash, TransactionHash, Number as BlockNumber, Header, Id as BlockId};
use primitives::{Hash, blake2_256}; use primitives::{Hash, blake2_256};
use runtime_support::Hashable; use runtime_support::Hashable;
use network::{PeerId, NodeId}; use network::{PeerId, NodeId};
@@ -82,7 +82,7 @@ struct Peer {
/// Request timestamp /// Request timestamp
request_timestamp: Option<time::Instant>, request_timestamp: Option<time::Instant>,
/// Holds a set of transactions known to this peer. /// Holds a set of transactions known to this peer.
known_transactions: HashSet<ExtrinsicHash>, known_transactions: HashSet<TransactionHash>,
/// Holds a set of blocks known to this peer. /// Holds a set of blocks known to this peer.
known_blocks: HashSet<HeaderHash>, known_blocks: HashSet<HeaderHash>,
/// Request counter, /// Request counter,
@@ -443,7 +443,7 @@ impl Protocol {
} }
/// Called when peer sends us new transactions /// Called when peer sends us new transactions
pub fn propagate_transactions(&self, io: &mut SyncIo, transactions: &[(ExtrinsicHash, Vec<u8>)]) { pub fn propagate_transactions(&self, io: &mut SyncIo, transactions: &[(TransactionHash, Vec<u8>)]) {
// Accept transactions only when fully synced // Accept transactions only when fully synced
if self.sync.read().status().state != SyncState::Idle { if self.sync.read().status().state != SyncState::Idle {
return; return;
@@ -513,7 +513,7 @@ impl Protocol {
} }
} }
pub fn transactions_stats(&self) -> BTreeMap<ExtrinsicHash, TransactionStats> { pub fn transactions_stats(&self) -> BTreeMap<TransactionHash, TransactionStats> {
BTreeMap::new() BTreeMap::new()
} }
+6 -6
View File
@@ -21,7 +21,7 @@ use futures::sync::{oneshot, mpsc};
use network::{NetworkProtocolHandler, NetworkContext, HostInfo, PeerId, ProtocolId, use network::{NetworkProtocolHandler, NetworkContext, HostInfo, PeerId, ProtocolId,
NetworkConfiguration , NonReservedPeerMode, ErrorKind}; NetworkConfiguration , NonReservedPeerMode, ErrorKind};
use network_devp2p::{NetworkService}; use network_devp2p::{NetworkService};
use primitives::block::{ExtrinsicHash, Header, HeaderHash}; use primitives::block::{TransactionHash, Header, HeaderHash};
use primitives::Hash; use primitives::Hash;
use core_io::{TimerToken}; use core_io::{TimerToken};
use io::NetSyncIo; use io::NetSyncIo;
@@ -66,15 +66,15 @@ pub trait SyncProvider: Send + Sync {
/// Get this node id if available. /// Get this node id if available.
fn node_id(&self) -> Option<String>; fn node_id(&self) -> Option<String>;
/// Returns propagation count for pending transactions. /// Returns propagation count for pending transactions.
fn transactions_stats(&self) -> BTreeMap<ExtrinsicHash, TransactionStats>; fn transactions_stats(&self) -> BTreeMap<TransactionHash, TransactionStats>;
} }
/// Transaction pool interface /// Transaction pool interface
pub trait TransactionPool: Send + Sync { pub trait TransactionPool: Send + Sync {
/// Get transactions from the pool that are ready to be propagated. /// Get transactions from the pool that are ready to be propagated.
fn transactions(&self) -> Vec<(ExtrinsicHash, Vec<u8>)>; fn transactions(&self) -> Vec<(TransactionHash, Vec<u8>)>;
/// Import a transction into the pool. /// Import a transction into the pool.
fn import(&self, transaction: &[u8]) -> Option<ExtrinsicHash>; fn import(&self, transaction: &[u8]) -> Option<TransactionHash>;
} }
/// ConsensusService /// ConsensusService
@@ -161,7 +161,7 @@ impl Service {
} }
/// Called when new transactons are imported by the client. /// Called when new transactons are imported by the client.
pub fn on_new_transactions(&self, transactions: &[(ExtrinsicHash, Vec<u8>)]) { pub fn on_new_transactions(&self, transactions: &[(TransactionHash, Vec<u8>)]) {
self.network.with_context(DOT_PROTOCOL_ID, |context| { self.network.with_context(DOT_PROTOCOL_ID, |context| {
self.handler.protocol.propagate_transactions(&mut NetSyncIo::new(context), transactions); self.handler.protocol.propagate_transactions(&mut NetSyncIo::new(context), transactions);
}); });
@@ -223,7 +223,7 @@ impl SyncProvider for Service {
self.network.external_url() self.network.external_url()
} }
fn transactions_stats(&self) -> BTreeMap<ExtrinsicHash, TransactionStats> { fn transactions_stats(&self) -> BTreeMap<TransactionHash, TransactionStats> {
self.handler.protocol.transactions_stats() self.handler.protocol.transactions_stats()
} }
} }
+4 -4
View File
@@ -21,7 +21,7 @@ use std::sync::Arc;
use parking_lot::RwLock; use parking_lot::RwLock;
use client::{self, genesis, BlockOrigin}; use client::{self, genesis, BlockOrigin};
use client::block_builder::BlockBuilder; use client::block_builder::BlockBuilder;
use primitives::block::{Id as BlockId, ExtrinsicHash}; use primitives::block::{Id as BlockId, TransactionHash};
use primitives; use primitives;
use executor; use executor;
use io::SyncIo; use io::SyncIo;
@@ -206,7 +206,7 @@ impl Peer {
nonce: nonce, nonce: nonce,
}; };
let signature = Keyring::from_raw_public(tx.from.clone()).unwrap().sign(&tx.encode()); let signature = Keyring::from_raw_public(tx.from.clone()).unwrap().sign(&tx.encode());
let tx = primitives::block::Extrinsic::decode(&mut test_runtime::UncheckedTransaction { signature, tx: tx }.encode().as_ref()).unwrap(); let tx = primitives::block::Transaction::decode(&mut test_runtime::UncheckedTransaction { signature, tx: tx }.encode().as_ref()).unwrap();
builder.push(tx).unwrap(); builder.push(tx).unwrap();
nonce = nonce + 1; nonce = nonce + 1;
}); });
@@ -219,11 +219,11 @@ impl Peer {
struct EmptyTransactionPool; struct EmptyTransactionPool;
impl TransactionPool for EmptyTransactionPool { impl TransactionPool for EmptyTransactionPool {
fn transactions(&self) -> Vec<(ExtrinsicHash, Vec<u8>)> { fn transactions(&self) -> Vec<(TransactionHash, Vec<u8>)> {
Vec::new() Vec::new()
} }
fn import(&self, _transaction: &[u8]) -> Option<ExtrinsicHash> { fn import(&self, _transaction: &[u8]) -> Option<TransactionHash> {
None None
} }
} }
+21 -6
View File
@@ -29,8 +29,23 @@ pub type Number = u64;
/// Hash used to refer to a block hash. /// Hash used to refer to a block hash.
pub type HeaderHash = Hash; pub type HeaderHash = Hash;
/// Hash used to refer to an extrinsic. /// Hash used to refer to a transaction hash.
pub type ExtrinsicHash = Hash; pub type TransactionHash = Hash;
/// Simple generic transaction type.
#[derive(PartialEq, Eq, Clone)]
#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug))]
pub struct Transaction(#[cfg_attr(feature = "std", serde(with="bytes"))] pub Vec<u8>);
impl Slicable for Transaction {
fn decode<I: Input>(input: &mut I) -> Option<Self> {
Vec::<u8>::decode(input).map(Transaction)
}
fn using_encoded<R, F: FnOnce(&[u8]) -> R>(&self, f: F) -> R {
self.0.using_encoded(f)
}
}
/// Simple generic extrinsic type. /// Simple generic extrinsic type.
#[derive(PartialEq, Eq, Clone)] #[derive(PartialEq, Eq, Clone)]
@@ -111,11 +126,11 @@ pub mod generic {
} }
} }
/// The body of a block is just a bunch of extrinsics. /// The body of a block is just a bunch of transactions.
pub type Body = Vec<Extrinsic>; pub type Body = Vec<Transaction>;
/// The header and body of a concrete, but unspecialised, block. Used by substrate to represent a /// The header and body of a concrete, but unspecialised, block. Used by substrate to represent a
/// block some fields of which the runtime alone knows how to interpret (e.g. the transactions). /// block some fields of which the runtime alone knows how to interpret (e.g. the transactions).
pub type Block = generic::Block<Extrinsic>; pub type Block = generic::Block<Transaction>;
/// A substrate chain block header. /// A substrate chain block header.
// TODO: split out into light-client-specific fields and runtime-specific fields. // TODO: split out into light-client-specific fields and runtime-specific fields.
@@ -255,7 +270,7 @@ mod tests {
fn test_block_encoding() { fn test_block_encoding() {
let block = Block { let block = Block {
header: Header::from_block_number(12), header: Header::from_block_number(12),
transactions: vec![Extrinsic(vec!(4))], transactions: vec![Transaction(vec!(4))],
}; };
assert_eq!(block.encode(), vec![ assert_eq!(block.encode(), vec![
@@ -223,7 +223,7 @@ pub trait Block {
} }
impl Block for substrate_primitives::Block { impl Block for substrate_primitives::Block {
type Extrinsic = substrate_primitives::block::Extrinsic; type Extrinsic = substrate_primitives::block::Transaction;
type Header = substrate_primitives::Header; type Header = substrate_primitives::Header;
fn header(&self) -> &Self::Header { fn header(&self) -> &Self::Header {
&self.header &self.header