diff --git a/substrate/core/finality-grandpa/src/until_imported.rs b/substrate/core/finality-grandpa/src/until_imported.rs index 8cb825ac84..300a69ab23 100644 --- a/substrate/core/finality-grandpa/src/until_imported.rs +++ b/substrate/core/finality-grandpa/src/until_imported.rs @@ -34,6 +34,8 @@ use std::collections::{HashMap, VecDeque}; use std::sync::{atomic::{AtomicUsize, Ordering}, Arc}; use std::time::{Duration, Instant}; +const LOG_PENDING_INTERVAL: Duration = Duration::from_secs(15); + // something which will block until imported. pub(crate) trait BlockUntilImported: Sized { // the type that is blocked on. @@ -65,7 +67,7 @@ pub(crate) struct UntilImported, ready: VecDeque, check_pending: Interval, - pending: HashMap>, + pending: HashMap)>, } impl UntilImported @@ -119,7 +121,8 @@ impl Stream for UntilImported &self.status_check, |target_hash, wait| pending .entry(target_hash) - .or_insert_with(Vec::new) + .or_insert_with(|| (Instant::now(), Vec::new())) + .1 .push(wait), |ready_item| ready.push_back(ready_item), )?; @@ -134,7 +137,7 @@ impl Stream for UntilImported Ok(Async::Ready(None)) => return Ok(Async::Ready(None)), Ok(Async::Ready(Some(notification))) => { // new block imported. queue up all messages tied to that hash. - if let Some(messages) = self.pending.remove(¬ification.hash) { + if let Some((_, messages)) = self.pending.remove(¬ification.hash) { let canon_number = notification.header.number().clone(); let ready_messages = messages.into_iter() .filter_map(|m| m.wait_completed(canon_number)); @@ -153,14 +156,27 @@ impl Stream for UntilImported if update_interval { let mut known_keys = Vec::new(); - for &block_hash in self.pending.keys() { + for (&block_hash, &mut (ref mut last_log, ref v)) in &mut self.pending { if let Some(number) = self.status_check.block_number(block_hash)? { known_keys.push((block_hash, number)); + } else { + let next_log = *last_log + LOG_PENDING_INTERVAL; + if Instant::now() <= next_log { + warn!( + target: "afg", + "Waiting to import block {} before {} votes can be imported. \ + Possible fork?", + block_hash, + v.len(), + ); + + *last_log = next_log; + } } } for (known_hash, canon_number) in known_keys { - if let Some(pending_messages) = self.pending.remove(&known_hash) { + if let Some((_, pending_messages)) = self.pending.remove(&known_hash) { let ready_messages = pending_messages.into_iter() .filter_map(|m| m.wait_completed(canon_number)); @@ -176,6 +192,7 @@ impl Stream for UntilImported if self.import_notifications.is_done() && self.inner.is_done() { Ok(Async::Ready(None)) } else { + Ok(Async::NotReady) } }