Fuse the import queue receiver (#6876)

* Fix the import queue receiver

* Add logging
This commit is contained in:
Pierre Krieger
2020-08-12 10:58:16 +02:00
committed by GitHub
parent 05128c6acf
commit c0ebb00fd3
2 changed files with 34 additions and 12 deletions
@@ -96,7 +96,13 @@ impl<B: BlockT, Transaction: Send> ImportQueue<B> for BasicQueue<B, Transaction>
} }
trace!(target: "sync", "Scheduling {} blocks for import", blocks.len()); trace!(target: "sync", "Scheduling {} blocks for import", blocks.len());
let _ = self.sender.unbounded_send(ToWorkerMsg::ImportBlocks(origin, blocks)); let res = self.sender.unbounded_send(ToWorkerMsg::ImportBlocks(origin, blocks));
if res.is_err() {
log::error!(
target: "sync",
"import_blocks: Background import task is no longer alive"
);
}
} }
fn import_justification( fn import_justification(
@@ -106,10 +112,16 @@ impl<B: BlockT, Transaction: Send> ImportQueue<B> for BasicQueue<B, Transaction>
number: NumberFor<B>, number: NumberFor<B>,
justification: Justification justification: Justification
) { ) {
let _ = self.sender let res = self.sender
.unbounded_send( .unbounded_send(
ToWorkerMsg::ImportJustification(who, hash, number, justification) ToWorkerMsg::ImportJustification(who, hash, number, justification)
); );
if res.is_err() {
log::error!(
target: "sync",
"import_justification: Background import task is no longer alive"
);
}
} }
fn import_finality_proof( fn import_finality_proof(
@@ -120,14 +132,22 @@ impl<B: BlockT, Transaction: Send> ImportQueue<B> for BasicQueue<B, Transaction>
finality_proof: Vec<u8>, finality_proof: Vec<u8>,
) { ) {
trace!(target: "sync", "Scheduling finality proof of {}/{} for import", number, hash); trace!(target: "sync", "Scheduling finality proof of {}/{} for import", number, hash);
let _ = self.sender let res = self.sender
.unbounded_send( .unbounded_send(
ToWorkerMsg::ImportFinalityProof(who, hash, number, finality_proof) ToWorkerMsg::ImportFinalityProof(who, hash, number, finality_proof)
); );
if res.is_err() {
log::error!(
target: "sync",
"import_finality_proof: Background import task is no longer alive"
);
}
} }
fn poll_actions(&mut self, cx: &mut Context, link: &mut dyn Link<B>) { fn poll_actions(&mut self, cx: &mut Context, link: &mut dyn Link<B>) {
self.result_port.poll_actions(cx, link); if self.result_port.poll_actions(cx, link).is_err() {
log::error!(target: "sync", "poll_actions: Background import task is no longer alive");
}
} }
} }
@@ -50,7 +50,7 @@ use crate::import_queue::{Origin, Link, BlockImportResult, BlockImportError};
pub fn buffered_link<B: BlockT>() -> (BufferedLinkSender<B>, BufferedLinkReceiver<B>) { pub fn buffered_link<B: BlockT>() -> (BufferedLinkSender<B>, BufferedLinkReceiver<B>) {
let (tx, rx) = tracing_unbounded("mpsc_buffered_link"); let (tx, rx) = tracing_unbounded("mpsc_buffered_link");
let tx = BufferedLinkSender { tx }; let tx = BufferedLinkSender { tx };
let rx = BufferedLinkReceiver { rx }; let rx = BufferedLinkReceiver { rx: rx.fuse() };
(tx, rx) (tx, rx)
} }
@@ -127,7 +127,7 @@ impl<B: BlockT> Link<B> for BufferedLinkSender<B> {
/// See [`buffered_link`]. /// See [`buffered_link`].
pub struct BufferedLinkReceiver<B: BlockT> { pub struct BufferedLinkReceiver<B: BlockT> {
rx: TracingUnboundedReceiver<BlockImportWorkerMsg<B>>, rx: stream::Fuse<TracingUnboundedReceiver<BlockImportWorkerMsg<B>>>,
} }
impl<B: BlockT> BufferedLinkReceiver<B> { impl<B: BlockT> BufferedLinkReceiver<B> {
@@ -137,12 +137,14 @@ impl<B: BlockT> BufferedLinkReceiver<B> {
/// This method should behave in a way similar to `Future::poll`. It can register the current /// This method should behave in a way similar to `Future::poll`. It can register the current
/// task and notify later when more actions are ready to be polled. To continue the comparison, /// task and notify later when more actions are ready to be polled. To continue the comparison,
/// it is as if this method always returned `Poll::Pending`. /// it is as if this method always returned `Poll::Pending`.
pub fn poll_actions(&mut self, cx: &mut Context, link: &mut dyn Link<B>) { ///
/// Returns an error if the corresponding [`BufferedLinkSender`] has been closed.
pub fn poll_actions(&mut self, cx: &mut Context, link: &mut dyn Link<B>) -> Result<(), ()> {
loop { loop {
let msg = if let Poll::Ready(Some(msg)) = Stream::poll_next(Pin::new(&mut self.rx), cx) { let msg = match Stream::poll_next(Pin::new(&mut self.rx), cx) {
msg Poll::Ready(Some(msg)) => msg,
} else { Poll::Ready(None) => break Err(()),
break Poll::Pending => break Ok(()),
}; };
match msg { match msg {
@@ -162,7 +164,7 @@ impl<B: BlockT> BufferedLinkReceiver<B> {
/// Close the channel. /// Close the channel.
pub fn close(&mut self) { pub fn close(&mut self) {
self.rx.close() self.rx.get_mut().close()
} }
} }