diff --git a/substrate/client/network/src/protocol.rs b/substrate/client/network/src/protocol.rs index 7a9fc7ecfb..2df8f6597c 100644 --- a/substrate/client/network/src/protocol.rs +++ b/substrate/client/network/src/protocol.rs @@ -1135,18 +1135,26 @@ impl, H: ExHashT> Protocol { } } - /// Call when we must propagate ready extrinsics to peers. - pub fn propagate_extrinsics( + /// Propagate one extrinsic. + pub fn propagate_extrinsic( &mut self, + hash: &H, ) { - debug!(target: "sync", "Propagating extrinsics"); - + debug!(target: "sync", "Propagating extrinsic [{:?}]", hash); // Accept transactions only when fully synced if self.sync.status().state != SyncState::Idle { return; } + if let Some(extrinsic) = self.transaction_pool.transaction(hash) { + let propagated_to = self.do_propagate_extrinsics(&[(hash.clone(), extrinsic)]); + self.transaction_pool.on_broadcasted(propagated_to); + } + } - let extrinsics = self.transaction_pool.transactions(); + fn do_propagate_extrinsics( + &mut self, + extrinsics: &[(H, B::Extrinsic)], + ) -> HashMap> { let mut propagated_to = HashMap::new(); for (who, peer) in self.context_data.peers.iter_mut() { // never send extrinsics to the light node @@ -1177,6 +1185,18 @@ impl, H: ExHashT> Protocol { } } + propagated_to + } + + /// Call when we must propagate ready extrinsics to peers. + pub fn propagate_extrinsics(&mut self) { + debug!(target: "sync", "Propagating extrinsics"); + // Accept transactions only when fully synced + if self.sync.status().state != SyncState::Idle { + return; + } + let extrinsics = self.transaction_pool.transactions(); + let propagated_to = self.do_propagate_extrinsics(&extrinsics); self.transaction_pool.on_broadcasted(propagated_to); } diff --git a/substrate/client/network/src/service.rs b/substrate/client/network/src/service.rs index 9c9a87bb67..c6b6168126 100644 --- a/substrate/client/network/src/service.rs +++ b/substrate/client/network/src/service.rs @@ -77,6 +77,8 @@ pub trait TransactionPool: Send + Sync { ); /// Notify the pool about transactions broadcast. fn on_broadcasted(&self, propagations: HashMap>); + /// Get transaction by hash. + fn transaction(&self, hash: &H) -> Option; } /// A cloneable handle for reporting cost/benefits of peers. @@ -115,7 +117,7 @@ pub struct NetworkService, H: E /// nodes it should be connected to or not. peerset: PeersetHandle, /// Channel that sends messages to the actual worker. - to_worker: mpsc::UnboundedSender>, + to_worker: mpsc::UnboundedSender>, /// Marker to pin the `H` generic. Serves no purpose except to not break backwards /// compatibility. _marker: PhantomData, @@ -477,14 +479,22 @@ impl, H: ExHashT> NetworkServic }); } - /// You must call this when new transactons are imported by the transaction pool. + /// You may call this when new transactons are imported by the transaction pool. /// - /// The latest transactions will be fetched from the `TransactionPool` that was passed at - /// initialization as part of the configuration. + /// All transactions will be fetched from the `TransactionPool` that was passed at + /// initialization as part of the configuration and propagated to peers. pub fn trigger_repropagate(&self) { let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PropagateExtrinsics); } + /// You must call when new transaction is imported by the transaction pool. + /// + /// This transaction will be fetched from the `TransactionPool` that was passed at + /// initialization as part of the configuration and propagated to peers. + pub fn propagate_extrinsic(&self, hash: H) { + let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PropagateExtrinsic(hash)); + } + /// Make sure an important block is propagated to peers. /// /// In chain-based consensus, we often need to make sure non-best forks are @@ -665,7 +675,8 @@ impl NetworkStateInfo for NetworkService /// Messages sent from the `NetworkService` to the `NetworkWorker`. /// /// Each entry corresponds to a method of `NetworkService`. -enum ServiceToWorkerMsg> { +enum ServiceToWorkerMsg> { + PropagateExtrinsic(H), PropagateExtrinsics, RequestJustification(B::Hash, NumberFor), AnnounceBlock(B::Hash, Vec), @@ -704,7 +715,7 @@ pub struct NetworkWorker, H: Ex /// The import queue that was passed as initialization. import_queue: Box>, /// Messages from the `NetworkService` and that must be processed. - from_worker: mpsc::UnboundedReceiver>, + from_worker: mpsc::UnboundedReceiver>, /// Receiver for queries from the light client that must be processed. light_client_rqs: Option>>, /// Senders for events that happen on the network. @@ -747,6 +758,8 @@ impl, H: ExHashT> Future for Ne this.network_service.user_protocol_mut().announce_block(hash, data), ServiceToWorkerMsg::RequestJustification(hash, number) => this.network_service.user_protocol_mut().request_justification(&hash, number), + ServiceToWorkerMsg::PropagateExtrinsic(hash) => + this.network_service.user_protocol_mut().propagate_extrinsic(&hash), ServiceToWorkerMsg::PropagateExtrinsics => this.network_service.user_protocol_mut().propagate_extrinsics(), ServiceToWorkerMsg::GetValue(key) => diff --git a/substrate/client/network/test/src/lib.rs b/substrate/client/network/test/src/lib.rs index 2369ba4f22..874c4d19f9 100644 --- a/substrate/client/network/test/src/lib.rs +++ b/substrate/client/network/test/src/lib.rs @@ -391,6 +391,8 @@ impl TransactionPool for EmptyTransactionPool { ) {} fn on_broadcasted(&self, _: HashMap>) {} + + fn transaction(&self, h: &Hash) -> Option { None } } pub trait SpecializationFactory { diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs index caf97438ad..202597e787 100644 --- a/substrate/client/service/src/builder.rs +++ b/substrate/client/service/src/builder.rs @@ -906,9 +906,9 @@ ServiceBuilder< let network = Arc::downgrade(&network); let transaction_pool_ = transaction_pool.clone(); let events = transaction_pool.import_notification_stream() - .for_each(move |_| { + .for_each(move |hash| { if let Some(network) = network.upgrade() { - network.trigger_repropagate(); + network.propagate_extrinsic(hash); } let status = transaction_pool_.status(); telemetry!(SUBSTRATE_INFO; "txpool.import"; diff --git a/substrate/client/service/src/lib.rs b/substrate/client/service/src/lib.rs index 57e1462f64..c7069ac6e5 100644 --- a/substrate/client/service/src/lib.rs +++ b/substrate/client/service/src/lib.rs @@ -608,7 +608,7 @@ where H: std::hash::Hash + Eq + sp_runtime::traits::Member + sp_runtime::traits::MaybeSerialize, E: 'static + IntoPoolError + From, { - fn transactions(&self) -> Vec<(H, ::Extrinsic)> { + fn transactions(&self) -> Vec<(H, B::Extrinsic)> { transactions_to_propagate(&*self.pool) } @@ -660,6 +660,10 @@ where fn on_broadcasted(&self, propagations: HashMap>) { self.pool.on_broadcasted(propagations) } + + fn transaction(&self, hash: &H) -> Option { + self.pool.ready_transaction(hash).map(|tx| tx.data().clone()) + } } #[cfg(test)] diff --git a/substrate/client/transaction-pool/graph/src/base_pool.rs b/substrate/client/transaction-pool/graph/src/base_pool.rs index 1da731b71e..7b7900c3e9 100644 --- a/substrate/client/transaction-pool/graph/src/base_pool.rs +++ b/substrate/client/transaction-pool/graph/src/base_pool.rs @@ -374,9 +374,9 @@ impl BasePool Vec>>> { - let ready = self.ready.by_hash(hashes); - let future = self.future.by_hash(hashes); + pub fn by_hashes(&self, hashes: &[Hash]) -> Vec>>> { + let ready = self.ready.by_hashes(hashes); + let future = self.future.by_hashes(hashes); ready .into_iter() @@ -385,6 +385,11 @@ impl BasePool Option>> { + self.ready.by_hash(hash) + } + /// Makes sure that the transactions in the queues stay within provided limits. /// /// Removes and returns worst transactions from the queues and all transactions that depend on them. diff --git a/substrate/client/transaction-pool/graph/src/future.rs b/substrate/client/transaction-pool/graph/src/future.rs index d106c65d45..0de50c1a65 100644 --- a/substrate/client/transaction-pool/graph/src/future.rs +++ b/substrate/client/transaction-pool/graph/src/future.rs @@ -160,7 +160,7 @@ impl FutureTransactions { } /// Returns a list of known transactions - pub fn by_hash(&self, hashes: &[Hash]) -> Vec>>> { + pub fn by_hashes(&self, hashes: &[Hash]) -> Vec>>> { hashes.iter().map(|h| self.waiting.get(h).map(|x| x.transaction.clone())).collect() } diff --git a/substrate/client/transaction-pool/graph/src/pool.rs b/substrate/client/transaction-pool/graph/src/pool.rs index d0f14ad40a..815b5871ea 100644 --- a/substrate/client/transaction-pool/graph/src/pool.rs +++ b/substrate/client/transaction-pool/graph/src/pool.rs @@ -38,7 +38,7 @@ use sp_transaction_pool::{error, PoolStatus}; use crate::validated_pool::{ValidatedPool, ValidatedTransaction}; /// Modification notification event stream type; -pub type EventStream = mpsc::UnboundedReceiver<()>; +pub type EventStream = mpsc::UnboundedReceiver; /// Extrinsic hash type for a pool. pub type ExHash = ::Hash; @@ -105,11 +105,11 @@ impl Default for Options { fn default() -> Self { Options { ready: base::Limit { - count: 512, - total_bytes: 10 * 1024 * 1024, + count: 8192, + total_bytes: 20 * 1024 * 1024, }, future: base::Limit { - count: 128, + count: 512, total_bytes: 1 * 1024 * 1024, }, reject_future_transactions: false, @@ -331,7 +331,7 @@ impl Pool { /// /// Consumers of this stream should use the `ready` method to actually get the /// pending transactions in the right order. - pub fn import_notification_stream(&self) -> EventStream { + pub fn import_notification_stream(&self) -> EventStream> { self.validated_pool.import_notification_stream() } @@ -437,6 +437,11 @@ impl Pool { (hash, validity) } + + /// Get ready transaction by hash, if it present in the pool. + pub fn ready_transaction(&self, hash: &ExHash) -> Option> { + self.validated_pool.ready_by_hash(hash) + } } impl Clone for Pool { @@ -638,8 +643,8 @@ mod tests { // then let mut it = futures::executor::block_on_stream(stream); - assert_eq!(it.next(), Some(())); - assert_eq!(it.next(), Some(())); + assert_eq!(it.next(), Some(32)); + assert_eq!(it.next(), Some(33)); assert_eq!(it.next(), None); } diff --git a/substrate/client/transaction-pool/graph/src/ready.rs b/substrate/client/transaction-pool/graph/src/ready.rs index cdb0076e5a..ec8d66e6b9 100644 --- a/substrate/client/transaction-pool/graph/src/ready.rs +++ b/substrate/client/transaction-pool/graph/src/ready.rs @@ -230,8 +230,13 @@ impl ReadyTransactions { self.ready.read().contains_key(hash) } - /// Retrieve transaction by hash - pub fn by_hash(&self, hashes: &[Hash]) -> Vec>>> { + /// Retrive transaction by hash + pub fn by_hash(&self, hash: &Hash) -> Option>> { + self.by_hashes(&[hash.clone()]).into_iter().next().unwrap_or(None) + } + + /// Retrieve transactions by hash + pub fn by_hashes(&self, hashes: &[Hash]) -> Vec>>> { let ready = self.ready.read(); hashes.iter().map(|hash| { ready.get(hash).map(|x| x.transaction.transaction.clone()) diff --git a/substrate/client/transaction-pool/graph/src/validated_pool.rs b/substrate/client/transaction-pool/graph/src/validated_pool.rs index 34f34d5806..44e51a7a61 100644 --- a/substrate/client/transaction-pool/graph/src/validated_pool.rs +++ b/substrate/client/transaction-pool/graph/src/validated_pool.rs @@ -70,7 +70,7 @@ pub(crate) struct ValidatedPool { ExHash, ExtrinsicFor, >>, - import_notification_sinks: Mutex>>, + import_notification_sinks: Mutex>>>, rotator: PoolRotator>, } @@ -125,8 +125,8 @@ impl ValidatedPool { ValidatedTransaction::Valid(tx) => { let imported = self.pool.write().import(tx)?; - if let base::Imported::Ready { .. } = imported { - self.import_notification_sinks.lock().retain(|sink| sink.unbounded_send(()).is_ok()); + if let base::Imported::Ready { ref hash, .. } = imported { + self.import_notification_sinks.lock().retain(|sink| sink.unbounded_send(hash.clone()).is_ok()); } let mut listener = self.listener.write(); @@ -320,7 +320,7 @@ impl ValidatedPool { /// For each extrinsic, returns tags that it provides (if known), or None (if it is unknown). pub fn extrinsics_tags(&self, hashes: &[ExHash]) -> Vec>> { - self.pool.read().by_hash(&hashes) + self.pool.read().by_hashes(&hashes) .into_iter() .map(|existing_in_pool| existing_in_pool .map(|transaction| transaction.provides.iter().cloned() @@ -328,6 +328,11 @@ impl ValidatedPool { .collect() } + /// Get ready transaction by hash + pub fn ready_by_hash(&self, hash: &ExHash) -> Option> { + self.pool.read().ready_by_hash(hash) + } + /// Prunes ready transactions that provide given list of tags. pub fn prune_tags( &self, @@ -444,7 +449,7 @@ impl ValidatedPool { } /// Return an event stream of transactions imported to the pool. - pub fn import_notification_stream(&self) -> EventStream { + pub fn import_notification_stream(&self) -> EventStream> { let (sink, stream) = mpsc::unbounded(); self.import_notification_sinks.lock().push(sink); stream diff --git a/substrate/client/transaction-pool/src/lib.rs b/substrate/client/transaction-pool/src/lib.rs index b97294abe1..5f116dfd02 100644 --- a/substrate/client/transaction-pool/src/lib.rs +++ b/substrate/client/transaction-pool/src/lib.rs @@ -180,7 +180,7 @@ impl TransactionPool for BasicPool Box::new(self.pool.ready()) } - fn import_notification_stream(&self) -> ImportNotificationStream { + fn import_notification_stream(&self) -> ImportNotificationStream> { self.pool.import_notification_stream() } @@ -191,6 +191,10 @@ impl TransactionPool for BasicPool fn on_broadcasted(&self, propagations: HashMap, Vec>) { self.pool.on_broadcasted(propagations) } + + fn ready_transaction(&self, hash: &TxHash) -> Option> { + self.pool.ready_transaction(hash) + } } #[cfg_attr(test, derive(Debug))] diff --git a/substrate/primitives/transaction-pool/src/pool.rs b/substrate/primitives/transaction-pool/src/pool.rs index 92a0c4b5c3..2a71773996 100644 --- a/substrate/primitives/transaction-pool/src/pool.rs +++ b/substrate/primitives/transaction-pool/src/pool.rs @@ -113,7 +113,7 @@ pub enum TransactionStatus { pub type TransactionStatusStream = dyn Stream> + Send + Unpin; /// The import notification event stream. -pub type ImportNotificationStream = mpsc::UnboundedReceiver<()>; +pub type ImportNotificationStream = mpsc::UnboundedReceiver; /// Transaction hash type for a pool. pub type TxHash

=

::Hash; @@ -167,7 +167,7 @@ pub trait TransactionPool: Send + Sync { /// Error type. type Error: From + crate::error::IntoPoolError; - // Networking + // *** RPC /// Returns a future that imports a bunch of unverified transactions to the pool. fn submit_at( @@ -183,7 +183,6 @@ pub trait TransactionPool: Send + Sync { xt: TransactionFor, ) -> PoolFuture, Self::Error>; - // *** RPC /// Returns a future that import a single transaction and starts to watch their progress in the pool. fn submit_and_watch( &self, @@ -205,7 +204,7 @@ pub trait TransactionPool: Send + Sync { // *** logging / RPC / networking /// Return an event stream of transactions imported to the pool. - fn import_notification_stream(&self) -> ImportNotificationStream; + fn import_notification_stream(&self) -> ImportNotificationStream>; // *** networking /// Notify the pool about transactions broadcast. @@ -213,6 +212,9 @@ pub trait TransactionPool: Send + Sync { /// Returns transaction hash fn hash_of(&self, xt: &TransactionFor) -> TxHash; + + /// Return specific ready transaction by hash, if there is one. + fn ready_transaction(&self, hash: &TxHash) -> Option>; } /// Trait for transaction pool maintenance.