log when it takes a while to import forks we need from GRANDPA (#1533)

This commit is contained in:
Robert Habermeier
2019-01-23 12:22:24 -03:00
committed by GitHub
parent dd88dc6cd6
commit 7c1f92b26f
@@ -34,6 +34,8 @@ use std::collections::{HashMap, VecDeque};
use std::sync::{atomic::{AtomicUsize, Ordering}, Arc};
use std::time::{Duration, Instant};
const LOG_PENDING_INTERVAL: Duration = Duration::from_secs(15);
// something which will block until imported.
pub(crate) trait BlockUntilImported<Block: BlockT>: Sized {
// the type that is blocked on.
@@ -65,7 +67,7 @@ pub(crate) struct UntilImported<Block: BlockT, Status, I, M: BlockUntilImported<
inner: Fuse<I>,
ready: VecDeque<M::Blocked>,
check_pending: Interval,
pending: HashMap<Block::Hash, Vec<M>>,
pending: HashMap<Block::Hash, (Instant, Vec<M>)>,
}
impl<Block: BlockT, Status, I: Stream, M> UntilImported<Block, Status, I, M>
@@ -119,7 +121,8 @@ impl<Block: BlockT, Status, I, M> Stream for UntilImported<Block, Status, I, M>
&self.status_check,
|target_hash, wait| pending
.entry(target_hash)
.or_insert_with(Vec::new)
.or_insert_with(|| (Instant::now(), Vec::new()))
.1
.push(wait),
|ready_item| ready.push_back(ready_item),
)?;
@@ -134,7 +137,7 @@ impl<Block: BlockT, Status, I, M> Stream for UntilImported<Block, Status, I, M>
Ok(Async::Ready(None)) => return Ok(Async::Ready(None)),
Ok(Async::Ready(Some(notification))) => {
// new block imported. queue up all messages tied to that hash.
if let Some(messages) = self.pending.remove(&notification.hash) {
if let Some((_, messages)) = self.pending.remove(&notification.hash) {
let canon_number = notification.header.number().clone();
let ready_messages = messages.into_iter()
.filter_map(|m| m.wait_completed(canon_number));
@@ -153,14 +156,27 @@ impl<Block: BlockT, Status, I, M> Stream for UntilImported<Block, Status, I, M>
if update_interval {
let mut known_keys = Vec::new();
for &block_hash in self.pending.keys() {
for (&block_hash, &mut (ref mut last_log, ref v)) in &mut self.pending {
if let Some(number) = self.status_check.block_number(block_hash)? {
known_keys.push((block_hash, number));
} else {
let next_log = *last_log + LOG_PENDING_INTERVAL;
if Instant::now() <= next_log {
warn!(
target: "afg",
"Waiting to import block {} before {} votes can be imported. \
Possible fork?",
block_hash,
v.len(),
);
*last_log = next_log;
}
}
}
for (known_hash, canon_number) in known_keys {
if let Some(pending_messages) = self.pending.remove(&known_hash) {
if let Some((_, pending_messages)) = self.pending.remove(&known_hash) {
let ready_messages = pending_messages.into_iter()
.filter_map(|m| m.wait_completed(canon_number));
@@ -176,6 +192,7 @@ impl<Block: BlockT, Status, I, M> Stream for UntilImported<Block, Status, I, M>
if self.import_notifications.is_done() && self.inner.is_done() {
Ok(Async::Ready(None))
} else {
Ok(Async::NotReady)
}
}