Switch to bounded mpsc for txpool import notification stream (#6640)

* Switch to bounded mpsc for txpool import notification stream

* Update client/transaction-pool/graph/src/validated_pool.rs

Co-authored-by: Nikolay Volf <nikvolf@gmail.com>

Co-authored-by: Nikolay Volf <nikvolf@gmail.com>
This commit is contained in:
Wei Tang
2020-07-17 12:31:47 +02:00
committed by GitHub
parent 85e1f9aa8d
commit 8ae1aa4c28
6 changed files with 30 additions and 11 deletions
@@ -36,7 +36,8 @@ use sp_runtime::{
};
use sp_transaction_pool::{error, PoolStatus};
use wasm_timer::Instant;
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
use futures::channel::mpsc::{channel, Sender};
use retain_mut::RetainMut;
use crate::base_pool::PruneStatus;
use crate::pool::{
@@ -98,7 +99,7 @@ pub struct ValidatedPool<B: ChainApi> {
ExtrinsicHash<B>,
ExtrinsicFor<B>,
>>,
import_notification_sinks: Mutex<Vec<TracingUnboundedSender<ExtrinsicHash<B>>>>,
import_notification_sinks: Mutex<Vec<Sender<ExtrinsicHash<B>>>>,
rotator: PoolRotator<ExtrinsicHash<B>>,
}
@@ -186,7 +187,19 @@ impl<B: ChainApi> ValidatedPool<B> {
if let base::Imported::Ready { ref hash, .. } = imported {
self.import_notification_sinks.lock()
.retain(|sink| sink.unbounded_send(hash.clone()).is_ok());
.retain_mut(|sink| {
match sink.try_send(hash.clone()) {
Ok(()) => true,
Err(e) => {
if e.is_full() {
log::warn!(target: "txpool", "[{:?}] Trying to notify an import but the channel is full", hash);
true
} else {
false
}
},
}
});
}
let mut listener = self.listener.write();
@@ -529,7 +542,9 @@ impl<B: ChainApi> ValidatedPool<B> {
/// Consumers of this stream should use the `ready` method to actually get the
/// pending transactions in the right order.
pub fn import_notification_stream(&self) -> EventStream<ExtrinsicHash<B>> {
let (sink, stream) = tracing_unbounded("mpsc_import_notifications");
const CHANNEL_BUFFER_SIZE: usize = 1024;
let (sink, stream) = channel(CHANNEL_BUFFER_SIZE);
self.import_notification_sinks.lock().push(sink);
stream
}