diff --git a/polkadot/consensus/src/service.rs b/polkadot/consensus/src/service.rs index d2f67efdf7..c2280f2eaa 100644 --- a/polkadot/consensus/src/service.rs +++ b/polkadot/consensus/src/service.rs @@ -22,7 +22,6 @@ use std::thread; use std::time::{Duration, Instant}; use std::sync::Arc; -use std::collections::{HashMap, VecDeque}; use futures::{future, Future, Stream, Sink, Async, Canceled, Poll}; use parking_lot::Mutex; use substrate_network as net; @@ -41,7 +40,6 @@ use error; const TIMER_DELAY_MS: u64 = 5000; const TIMER_INTERVAL_MS: u64 = 500; -const MESSAGE_LIFETIME_SEC: u64 = 10; struct BftSink { network: Arc, @@ -49,49 +47,9 @@ struct BftSink { _e: ::std::marker::PhantomData, } -#[derive(Clone)] -struct SharedMessageCollection { - /// Messages for consensus over a block with known hash. Also holds timestamp of the first message. - messages: Arc)>>>, -} - -impl SharedMessageCollection { - fn new() -> SharedMessageCollection { - SharedMessageCollection { - messages: Arc::new(Mutex::new(HashMap::new())), - } - } - - fn select(&self, parent_hash: HeaderHash, stream: net::BftMessageStream, authorities: Vec) -> Messages { - Messages { - messages: self.messages.lock().remove(&parent_hash).map(|(_, m)| m).unwrap_or_else(VecDeque::new), - parent_hash, - network_stream: stream, - authorities: authorities, - collection: self.clone(), - } - } - - fn push(&self, message: net::LocalizedBftMessage) { - self.messages.lock() - .entry(message.parent_hash) - .or_insert_with(|| (Instant::now(), VecDeque::new())) - .1.push_back(message); - } - - fn collect_garbage(&self) { - let expiration = Duration::from_secs(MESSAGE_LIFETIME_SEC); - let now = Instant::now(); - self.messages.lock().retain(|_, &mut (timestamp, _)| timestamp < now + expiration); - } -} - struct Messages { - parent_hash: HeaderHash, - messages: VecDeque, network_stream: net::BftMessageStream, authorities: Vec, - collection: SharedMessageCollection, } impl Stream for Messages { @@ -99,14 +57,6 @@ impl Stream for Messages { type Error = bft::Error; fn poll(&mut self) -> Poll, Self::Error> { - // push buffered messages first - while let Some(message) = self.messages.pop_front() { - match process_message(message, &self.authorities) { - Ok(message) => return Ok(Async::Ready(Some(message))), - Err(e) => debug!("Message validation failed: {:?}", e), - } - } - // check the network loop { match self.network_stream.poll() { @@ -114,15 +64,11 @@ impl Stream for Messages { Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::Ready(None)) => return Ok(Async::NotReady), // the input stream for agreements is never meant to logically end. Ok(Async::Ready(Some(message))) => { - if message.parent_hash == self.parent_hash { - match process_message(message, &self.authorities) { - Ok(message) => return Ok(Async::Ready(Some(message))), - Err(e) => { - debug!("Message validation failed: {:?}", e); - } + match process_message(message, &self.authorities) { + Ok(message) => return Ok(Async::Ready(Some(message))), + Err(e) => { + debug!("Message validation failed: {:?}", e); } - } else { - self.collection.push(message); } } } @@ -226,18 +172,17 @@ fn start_bft( client: &bft::Authorities, network: Arc, bft_service: &BftService, - messages: SharedMessageCollection ) where F: bft::ProposerFactory + 'static, C: bft::BlockImport + bft::Authorities + 'static, ::Error: ::std::fmt::Debug, ::Error: ::std::fmt::Display + Into, { - let hash = header.blake2_256().into(); - if bft_service.live_agreement().map_or(false, |h| h == hash) { + let parent_hash = header.blake2_256().into(); + if bft_service.live_agreement().map_or(false, |h| h == parent_hash) { return; } - let authorities = match client.authorities(&BlockId::Hash(hash)) { + let authorities = match client.authorities(&BlockId::Hash(parent_hash)) { Ok(authorities) => authorities, Err(e) => { debug!("Error reading authorities: {:?}", e); @@ -245,12 +190,16 @@ fn start_bft( } }; - let input = messages.select(hash, network.bft_messages(), authorities).map_err(|e| e.into()); - let output = BftSink { network: network, parent_hash: hash.clone(), _e: Default::default() }; - match bft_service.build_upon(&header, input, output) { + let input = Messages { + network_stream: network.bft_messages(parent_hash), + authorities, + }; + + let output = BftSink { network: network, parent_hash: parent_hash, _e: Default::default() }; + match bft_service.build_upon(&header, input.map_err(Into::into), output) { Ok(Some(bft)) => handle.spawn(bft), Ok(None) => {}, - Err(e) => debug!(target: "bft","BFT agreement error: {:?}", e), + Err(e) => debug!(target: "bft", "BFT agreement error: {:?}", e), } } @@ -281,7 +230,6 @@ impl Service { network: Network(network.clone()), handle: core.handle(), }; - let messages = SharedMessageCollection::new(); let bft_service = Arc::new(BftService::new(client.clone(), key, factory)); let notifications = { @@ -289,11 +237,10 @@ impl Service { let network = network.clone(); let client = client.clone(); let bft_service = bft_service.clone(); - let messages = messages.clone(); client.import_notification_stream().for_each(move |notification| { if notification.is_new_best { - start_bft(¬ification.header, handle.clone(), &*client, network.clone(), &*bft_service, messages.clone()); + start_bft(¬ification.header, handle.clone(), &*client, network.clone(), &*bft_service); } Ok(()) }) @@ -316,16 +263,14 @@ impl Service { let c = client.clone(); let s = bft_service.clone(); let n = network.clone(); - let m = messages.clone(); let handle = core.handle(); interval.map_err(|e| debug!("Timer error: {:?}", e)).for_each(move |_| { if let Ok(best_block) = c.best_block_header() { let hash = best_block.blake2_256(); - m.collect_garbage(); if hash == prev_best { debug!("Starting consensus round after a timeout"); - start_bft(&best_block, handle.clone(), &*c, n.clone(), &*s, m.clone()); + start_bft(&best_block, handle.clone(), &*c, n.clone(), &*s); } prev_best = hash; }