Fix quadratic iterations over transaction pool (#4736)

* transaction pool changes

* service & network changes

* address review

* reduce future pool
This commit is contained in:
Nikolay Volf
2020-01-27 09:26:42 -08:00
committed by Gavin Wood
parent 76acc96f3a
commit ed3da9f903
12 changed files with 102 additions and 37 deletions
+25 -5
View File
@@ -1135,18 +1135,26 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
}
}
/// Call when we must propagate ready extrinsics to peers.
pub fn propagate_extrinsics(
/// Propagate one extrinsic.
pub fn propagate_extrinsic(
&mut self,
hash: &H,
) {
debug!(target: "sync", "Propagating extrinsics");
debug!(target: "sync", "Propagating extrinsic [{:?}]", hash);
// Accept transactions only when fully synced
if self.sync.status().state != SyncState::Idle {
return;
}
if let Some(extrinsic) = self.transaction_pool.transaction(hash) {
let propagated_to = self.do_propagate_extrinsics(&[(hash.clone(), extrinsic)]);
self.transaction_pool.on_broadcasted(propagated_to);
}
}
let extrinsics = self.transaction_pool.transactions();
fn do_propagate_extrinsics(
&mut self,
extrinsics: &[(H, B::Extrinsic)],
) -> HashMap<H, Vec<String>> {
let mut propagated_to = HashMap::new();
for (who, peer) in self.context_data.peers.iter_mut() {
// never send extrinsics to the light node
@@ -1177,6 +1185,18 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
}
}
propagated_to
}
/// Call when we must propagate ready extrinsics to peers.
pub fn propagate_extrinsics(&mut self) {
debug!(target: "sync", "Propagating extrinsics");
// Accept transactions only when fully synced
if self.sync.status().state != SyncState::Idle {
return;
}
let extrinsics = self.transaction_pool.transactions();
let propagated_to = self.do_propagate_extrinsics(&extrinsics);
self.transaction_pool.on_broadcasted(propagated_to);
}
+19 -6
View File
@@ -77,6 +77,8 @@ pub trait TransactionPool<H: ExHashT, B: BlockT>: Send + Sync {
);
/// Notify the pool about transactions broadcast.
fn on_broadcasted(&self, propagations: HashMap<H, Vec<String>>);
/// Get transaction by hash.
fn transaction(&self, hash: &H) -> Option<B::Extrinsic>;
}
/// A cloneable handle for reporting cost/benefits of peers.
@@ -115,7 +117,7 @@ pub struct NetworkService<B: BlockT + 'static, S: NetworkSpecialization<B>, H: E
/// nodes it should be connected to or not.
peerset: PeersetHandle,
/// Channel that sends messages to the actual worker.
to_worker: mpsc::UnboundedSender<ServiceToWorkerMsg<B, S>>,
to_worker: mpsc::UnboundedSender<ServiceToWorkerMsg<B, H, S>>,
/// Marker to pin the `H` generic. Serves no purpose except to not break backwards
/// compatibility.
_marker: PhantomData<H>,
@@ -477,14 +479,22 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkServic
});
}
/// You must call this when new transactons are imported by the transaction pool.
/// You may call this when new transactons are imported by the transaction pool.
///
/// The latest transactions will be fetched from the `TransactionPool` that was passed at
/// initialization as part of the configuration.
/// All transactions will be fetched from the `TransactionPool` that was passed at
/// initialization as part of the configuration and propagated to peers.
pub fn trigger_repropagate(&self) {
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PropagateExtrinsics);
}
/// You must call when new transaction is imported by the transaction pool.
///
/// This transaction will be fetched from the `TransactionPool` that was passed at
/// initialization as part of the configuration and propagated to peers.
pub fn propagate_extrinsic(&self, hash: H) {
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PropagateExtrinsic(hash));
}
/// Make sure an important block is propagated to peers.
///
/// In chain-based consensus, we often need to make sure non-best forks are
@@ -665,7 +675,8 @@ impl<B, S, H> NetworkStateInfo for NetworkService<B, S, H>
/// Messages sent from the `NetworkService` to the `NetworkWorker`.
///
/// Each entry corresponds to a method of `NetworkService`.
enum ServiceToWorkerMsg<B: BlockT, S: NetworkSpecialization<B>> {
enum ServiceToWorkerMsg<B: BlockT, H: ExHashT, S: NetworkSpecialization<B>> {
PropagateExtrinsic(H),
PropagateExtrinsics,
RequestJustification(B::Hash, NumberFor<B>),
AnnounceBlock(B::Hash, Vec<u8>),
@@ -704,7 +715,7 @@ pub struct NetworkWorker<B: BlockT + 'static, S: NetworkSpecialization<B>, H: Ex
/// The import queue that was passed as initialization.
import_queue: Box<dyn ImportQueue<B>>,
/// Messages from the `NetworkService` and that must be processed.
from_worker: mpsc::UnboundedReceiver<ServiceToWorkerMsg<B, S>>,
from_worker: mpsc::UnboundedReceiver<ServiceToWorkerMsg<B, H, S>>,
/// Receiver for queries from the light client that must be processed.
light_client_rqs: Option<mpsc::UnboundedReceiver<RequestData<B>>>,
/// Senders for events that happen on the network.
@@ -747,6 +758,8 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for Ne
this.network_service.user_protocol_mut().announce_block(hash, data),
ServiceToWorkerMsg::RequestJustification(hash, number) =>
this.network_service.user_protocol_mut().request_justification(&hash, number),
ServiceToWorkerMsg::PropagateExtrinsic(hash) =>
this.network_service.user_protocol_mut().propagate_extrinsic(&hash),
ServiceToWorkerMsg::PropagateExtrinsics =>
this.network_service.user_protocol_mut().propagate_extrinsics(),
ServiceToWorkerMsg::GetValue(key) =>
+2
View File
@@ -391,6 +391,8 @@ impl TransactionPool<Hash, Block> for EmptyTransactionPool {
) {}
fn on_broadcasted(&self, _: HashMap<Hash, Vec<String>>) {}
fn transaction(&self, h: &Hash) -> Option<Extrinsic> { None }
}
pub trait SpecializationFactory {
+2 -2
View File
@@ -906,9 +906,9 @@ ServiceBuilder<
let network = Arc::downgrade(&network);
let transaction_pool_ = transaction_pool.clone();
let events = transaction_pool.import_notification_stream()
.for_each(move |_| {
.for_each(move |hash| {
if let Some(network) = network.upgrade() {
network.trigger_repropagate();
network.propagate_extrinsic(hash);
}
let status = transaction_pool_.status();
telemetry!(SUBSTRATE_INFO; "txpool.import";
+5 -1
View File
@@ -608,7 +608,7 @@ where
H: std::hash::Hash + Eq + sp_runtime::traits::Member + sp_runtime::traits::MaybeSerialize,
E: 'static + IntoPoolError + From<sp_transaction_pool::error::Error>,
{
fn transactions(&self) -> Vec<(H, <B as BlockT>::Extrinsic)> {
fn transactions(&self) -> Vec<(H, B::Extrinsic)> {
transactions_to_propagate(&*self.pool)
}
@@ -660,6 +660,10 @@ where
fn on_broadcasted(&self, propagations: HashMap<H, Vec<String>>) {
self.pool.on_broadcasted(propagations)
}
fn transaction(&self, hash: &H) -> Option<B::Extrinsic> {
self.pool.ready_transaction(hash).map(|tx| tx.data().clone())
}
}
#[cfg(test)]
@@ -374,9 +374,9 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> BasePool<Hash,
///
/// Includes both ready and future pool. For every hash in the `hashes`
/// iterator an `Option` is produced (so the resulting `Vec` always have the same length).
pub fn by_hash(&self, hashes: &[Hash]) -> Vec<Option<Arc<Transaction<Hash, Ex>>>> {
let ready = self.ready.by_hash(hashes);
let future = self.future.by_hash(hashes);
pub fn by_hashes(&self, hashes: &[Hash]) -> Vec<Option<Arc<Transaction<Hash, Ex>>>> {
let ready = self.ready.by_hashes(hashes);
let future = self.future.by_hashes(hashes);
ready
.into_iter()
@@ -385,6 +385,11 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> BasePool<Hash,
.collect()
}
/// Returns pool transaction by hash.
pub fn ready_by_hash(&self, hash: &Hash) -> Option<Arc<Transaction<Hash, Ex>>> {
self.ready.by_hash(hash)
}
/// Makes sure that the transactions in the queues stay within provided limits.
///
/// Removes and returns worst transactions from the queues and all transactions that depend on them.
@@ -160,7 +160,7 @@ impl<Hash: hash::Hash + Eq + Clone, Ex> FutureTransactions<Hash, Ex> {
}
/// Returns a list of known transactions
pub fn by_hash(&self, hashes: &[Hash]) -> Vec<Option<Arc<Transaction<Hash, Ex>>>> {
pub fn by_hashes(&self, hashes: &[Hash]) -> Vec<Option<Arc<Transaction<Hash, Ex>>>> {
hashes.iter().map(|h| self.waiting.get(h).map(|x| x.transaction.clone())).collect()
}
@@ -38,7 +38,7 @@ use sp_transaction_pool::{error, PoolStatus};
use crate::validated_pool::{ValidatedPool, ValidatedTransaction};
/// Modification notification event stream type;
pub type EventStream = mpsc::UnboundedReceiver<()>;
pub type EventStream<H> = mpsc::UnboundedReceiver<H>;
/// Extrinsic hash type for a pool.
pub type ExHash<A> = <A as ChainApi>::Hash;
@@ -105,11 +105,11 @@ impl Default for Options {
fn default() -> Self {
Options {
ready: base::Limit {
count: 512,
total_bytes: 10 * 1024 * 1024,
count: 8192,
total_bytes: 20 * 1024 * 1024,
},
future: base::Limit {
count: 128,
count: 512,
total_bytes: 1 * 1024 * 1024,
},
reject_future_transactions: false,
@@ -331,7 +331,7 @@ impl<B: ChainApi> Pool<B> {
///
/// Consumers of this stream should use the `ready` method to actually get the
/// pending transactions in the right order.
pub fn import_notification_stream(&self) -> EventStream {
pub fn import_notification_stream(&self) -> EventStream<ExHash<B>> {
self.validated_pool.import_notification_stream()
}
@@ -437,6 +437,11 @@ impl<B: ChainApi> Pool<B> {
(hash, validity)
}
/// Get ready transaction by hash, if it present in the pool.
pub fn ready_transaction(&self, hash: &ExHash<B>) -> Option<TransactionFor<B>> {
self.validated_pool.ready_by_hash(hash)
}
}
impl<B: ChainApi> Clone for Pool<B> {
@@ -638,8 +643,8 @@ mod tests {
// then
let mut it = futures::executor::block_on_stream(stream);
assert_eq!(it.next(), Some(()));
assert_eq!(it.next(), Some(()));
assert_eq!(it.next(), Some(32));
assert_eq!(it.next(), Some(33));
assert_eq!(it.next(), None);
}
@@ -230,8 +230,13 @@ impl<Hash: hash::Hash + Member + Serialize, Ex> ReadyTransactions<Hash, Ex> {
self.ready.read().contains_key(hash)
}
/// Retrieve transaction by hash
pub fn by_hash(&self, hashes: &[Hash]) -> Vec<Option<Arc<Transaction<Hash, Ex>>>> {
/// Retrive transaction by hash
pub fn by_hash(&self, hash: &Hash) -> Option<Arc<Transaction<Hash, Ex>>> {
self.by_hashes(&[hash.clone()]).into_iter().next().unwrap_or(None)
}
/// Retrieve transactions by hash
pub fn by_hashes(&self, hashes: &[Hash]) -> Vec<Option<Arc<Transaction<Hash, Ex>>>> {
let ready = self.ready.read();
hashes.iter().map(|hash| {
ready.get(hash).map(|x| x.transaction.transaction.clone())
@@ -70,7 +70,7 @@ pub(crate) struct ValidatedPool<B: ChainApi> {
ExHash<B>,
ExtrinsicFor<B>,
>>,
import_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<()>>>,
import_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<ExHash<B>>>>,
rotator: PoolRotator<ExHash<B>>,
}
@@ -125,8 +125,8 @@ impl<B: ChainApi> ValidatedPool<B> {
ValidatedTransaction::Valid(tx) => {
let imported = self.pool.write().import(tx)?;
if let base::Imported::Ready { .. } = imported {
self.import_notification_sinks.lock().retain(|sink| sink.unbounded_send(()).is_ok());
if let base::Imported::Ready { ref hash, .. } = imported {
self.import_notification_sinks.lock().retain(|sink| sink.unbounded_send(hash.clone()).is_ok());
}
let mut listener = self.listener.write();
@@ -320,7 +320,7 @@ impl<B: ChainApi> ValidatedPool<B> {
/// For each extrinsic, returns tags that it provides (if known), or None (if it is unknown).
pub fn extrinsics_tags(&self, hashes: &[ExHash<B>]) -> Vec<Option<Vec<Tag>>> {
self.pool.read().by_hash(&hashes)
self.pool.read().by_hashes(&hashes)
.into_iter()
.map(|existing_in_pool| existing_in_pool
.map(|transaction| transaction.provides.iter().cloned()
@@ -328,6 +328,11 @@ impl<B: ChainApi> ValidatedPool<B> {
.collect()
}
/// Get ready transaction by hash
pub fn ready_by_hash(&self, hash: &ExHash<B>) -> Option<TransactionFor<B>> {
self.pool.read().ready_by_hash(hash)
}
/// Prunes ready transactions that provide given list of tags.
pub fn prune_tags(
&self,
@@ -444,7 +449,7 @@ impl<B: ChainApi> ValidatedPool<B> {
}
/// Return an event stream of transactions imported to the pool.
pub fn import_notification_stream(&self) -> EventStream {
pub fn import_notification_stream(&self) -> EventStream<ExHash<B>> {
let (sink, stream) = mpsc::unbounded();
self.import_notification_sinks.lock().push(sink);
stream
+5 -1
View File
@@ -180,7 +180,7 @@ impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
Box::new(self.pool.ready())
}
fn import_notification_stream(&self) -> ImportNotificationStream {
fn import_notification_stream(&self) -> ImportNotificationStream<TxHash<Self>> {
self.pool.import_notification_stream()
}
@@ -191,6 +191,10 @@ impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
fn on_broadcasted(&self, propagations: HashMap<TxHash<Self>, Vec<String>>) {
self.pool.on_broadcasted(propagations)
}
fn ready_transaction(&self, hash: &TxHash<Self>) -> Option<Arc<Self::InPoolTransaction>> {
self.pool.ready_transaction(hash)
}
}
#[cfg_attr(test, derive(Debug))]
@@ -113,7 +113,7 @@ pub enum TransactionStatus<Hash, BlockHash> {
pub type TransactionStatusStream<Hash, BlockHash> = dyn Stream<Item=TransactionStatus<Hash, BlockHash>> + Send + Unpin;
/// The import notification event stream.
pub type ImportNotificationStream = mpsc::UnboundedReceiver<()>;
pub type ImportNotificationStream<H> = mpsc::UnboundedReceiver<H>;
/// Transaction hash type for a pool.
pub type TxHash<P> = <P as TransactionPool>::Hash;
@@ -167,7 +167,7 @@ pub trait TransactionPool: Send + Sync {
/// Error type.
type Error: From<crate::error::Error> + crate::error::IntoPoolError;
// Networking
// *** RPC
/// Returns a future that imports a bunch of unverified transactions to the pool.
fn submit_at(
@@ -183,7 +183,6 @@ pub trait TransactionPool: Send + Sync {
xt: TransactionFor<Self>,
) -> PoolFuture<TxHash<Self>, Self::Error>;
// *** RPC
/// Returns a future that import a single transaction and starts to watch their progress in the pool.
fn submit_and_watch(
&self,
@@ -205,7 +204,7 @@ pub trait TransactionPool: Send + Sync {
// *** logging / RPC / networking
/// Return an event stream of transactions imported to the pool.
fn import_notification_stream(&self) -> ImportNotificationStream;
fn import_notification_stream(&self) -> ImportNotificationStream<TxHash<Self>>;
// *** networking
/// Notify the pool about transactions broadcast.
@@ -213,6 +212,9 @@ pub trait TransactionPool: Send + Sync {
/// Returns transaction hash
fn hash_of(&self, xt: &TransactionFor<Self>) -> TxHash<Self>;
/// Return specific ready transaction by hash, if there is one.
fn ready_transaction(&self, hash: &TxHash<Self>) -> Option<Arc<Self::InPoolTransaction>>;
}
/// Trait for transaction pool maintenance.