Make unbounded channels size warning exact (part 2) (#13504)

This commit is contained in:
Dmitry Markin
2023-03-07 10:57:26 +03:00
committed by GitHub
parent 58d1d9e117
commit 3118026576
14 changed files with 216 additions and 347 deletions
@@ -118,8 +118,8 @@ impl<B: BlockT> BasicQueueHandle<B> {
}
pub fn close(&mut self) {
self.justification_sender.close_channel();
self.block_import_sender.close_channel();
self.justification_sender.close();
self.block_import_sender.close();
}
}
@@ -597,11 +597,11 @@ mod tests {
fn prioritizes_finality_work_over_block_import() {
let (result_sender, mut result_port) = buffered_link::buffered_link(100_000);
let (worker, mut finality_sender, mut block_import_sender) =
let (worker, finality_sender, block_import_sender) =
BlockImportWorker::new(result_sender, (), Box::new(()), Some(Box::new(())), None);
futures::pin_mut!(worker);
let mut import_block = |n| {
let import_block = |n| {
let header = Header {
parent_hash: Hash::random(),
number: n,
@@ -612,35 +612,37 @@ mod tests {
let hash = header.hash();
block_on(block_import_sender.send(worker_messages::ImportBlocks(
BlockOrigin::Own,
vec![IncomingBlock {
hash,
header: Some(header),
body: None,
indexed_body: None,
justifications: None,
origin: None,
allow_missing_state: false,
import_existing: false,
state: None,
skip_execution: false,
}],
)))
.unwrap();
block_import_sender
.unbounded_send(worker_messages::ImportBlocks(
BlockOrigin::Own,
vec![IncomingBlock {
hash,
header: Some(header),
body: None,
indexed_body: None,
justifications: None,
origin: None,
allow_missing_state: false,
import_existing: false,
state: None,
skip_execution: false,
}],
))
.unwrap();
hash
};
let mut import_justification = || {
let import_justification = || {
let hash = Hash::random();
block_on(finality_sender.send(worker_messages::ImportJustification(
libp2p::PeerId::random(),
hash,
1,
(*b"TEST", Vec::new()),
)))
.unwrap();
finality_sender
.unbounded_send(worker_messages::ImportJustification(
libp2p::PeerId::random(),
hash,
1,
(*b"TEST", Vec::new()),
))
.unwrap();
hash
};
@@ -53,7 +53,7 @@ use super::BlockImportResult;
/// can be used to buffer commands, and the receiver can be used to poll said commands and transfer
/// them to another link. `queue_size_warning` sets the warning threshold of the channel queue size.
pub fn buffered_link<B: BlockT>(
queue_size_warning: i64,
queue_size_warning: usize,
) -> (BufferedLinkSender<B>, BufferedLinkReceiver<B>) {
let (tx, rx) = tracing_unbounded("mpsc_buffered_link", queue_size_warning);
let tx = BufferedLinkSender { tx };
@@ -166,7 +166,7 @@ impl<B: BlockT> BufferedLinkReceiver<B> {
}
/// Close the channel.
pub fn close(&mut self) {
pub fn close(&mut self) -> bool {
self.rx.get_mut().close()
}
}