Fixed consensus message garbage collection (#191)

* Fixed garbage collection logic

* Track block being imported

* mod tests
This commit is contained in:
Arkadiy Paronyan
2018-05-31 18:17:18 +02:00
committed by Gav Wood
parent 66affa2b95
commit 32d85fbecf
3 changed files with 119 additions and 44 deletions
+33 -8
View File
@@ -18,9 +18,9 @@
use std::sync::Arc;
use futures::sync::mpsc;
use parking_lot::Mutex;
use parking_lot::{Mutex, RwLock};
use primitives::{self, block, AuthorityId};
use primitives::block::Id as BlockId;
use primitives::block::{Id as BlockId, HeaderHash};
use primitives::storage::{StorageKey, StorageData};
use runtime_support::Hashable;
use codec::Slicable;
@@ -45,6 +45,8 @@ pub struct Client<B, E> {
backend: Arc<B>,
executor: E,
import_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<BlockImportNotification>>>,
import_lock: Mutex<()>,
importing_block: RwLock<Option<HeaderHash>>, // holds the block hash currently being imported. TODO: replace this with block queue
}
/// A source of blockchain evenets.
@@ -183,6 +185,8 @@ impl<B, E> Client<B, E> where
backend,
executor,
import_notification_sinks: Mutex::new(Vec::new()),
import_lock: Mutex::new(()),
importing_block: RwLock::new(None),
})
}
@@ -283,13 +287,32 @@ impl<B, E> Client<B, E> where
header: JustifiedHeader,
body: Option<block::Body>,
) -> error::Result<ImportResult> {
// TODO: import lock
// TODO: import justification.
let (header, justification) = header.into_inner();
match self.backend.blockchain().status(BlockId::Hash(header.parent_hash))? {
blockchain::BlockStatus::InChain => (),
blockchain::BlockStatus::InChain => {},
blockchain::BlockStatus::Unknown => return Ok(ImportResult::UnknownParent),
}
let hash: block::HeaderHash = header.blake2_256().into();
let _import_lock = self.import_lock.lock();
*self.importing_block.write() = Some(hash);
let result = self.execute_and_import_block(origin, hash, header, justification, body);
*self.importing_block.write() = None;
result
}
fn execute_and_import_block(
&self,
origin: BlockOrigin,
hash: HeaderHash,
header: block::Header,
justification: bft::Justification,
body: Option<block::Body>,
) -> error::Result<ImportResult> {
match self.backend.blockchain().status(BlockId::Hash(hash))? {
blockchain::BlockStatus::InChain => return Ok(ImportResult::AlreadyInChain),
blockchain::BlockStatus::Unknown => {},
}
let mut transaction = self.backend.begin_operation(BlockId::Hash(header.parent_hash))?;
let storage_update = match transaction.state()? {
@@ -308,14 +331,12 @@ impl<B, E> Client<B, E> where
};
let is_new_best = header.number == self.backend.blockchain().info()?.best_number + 1;
let hash: block::HeaderHash = header.blake2_256().into();
trace!("Imported {}, (#{}), best={}, origin={:?}", hash, header.number, is_new_best, origin);
transaction.set_block_data(header.clone(), body, Some(justification.uncheck().into()), is_new_best)?;
if let Some(storage_update) = storage_update {
transaction.update_storage(storage_update)?;
}
self.backend.commit_operation(transaction)?;
if origin == BlockOrigin::NetworkBroadcast || origin == BlockOrigin::Own || origin == BlockOrigin::ConsensusBroadcast {
let notification = BlockImportNotification {
hash: hash,
@@ -325,7 +346,6 @@ impl<B, E> Client<B, E> where
};
self.import_notification_sinks.lock().retain(|sink| !sink.unbounded_send(notification.clone()).is_err());
}
Ok(ImportResult::Queued)
}
@@ -342,6 +362,11 @@ impl<B, E> Client<B, E> where
/// Get block status.
pub fn block_status(&self, id: &BlockId) -> error::Result<BlockStatus> {
// TODO: more efficient implementation
if let BlockId::Hash(ref h) = id {
if self.importing_block.read().as_ref().map_or(false, |importing| h == importing) {
return Ok(BlockStatus::Queued);
}
}
match self.backend.blockchain().header(*id).map_err(|e| error::Error::from_blockchain(Box::new(e)))?.is_some() {
true => Ok(BlockStatus::InChain),
false => Ok(BlockStatus::Unknown),
+84 -33
View File
@@ -22,12 +22,12 @@ use std::time::{Instant, Duration};
use io::SyncIo;
use protocol::Protocol;
use network::PeerId;
use primitives::{Hash, block::HeaderHash, block::Id as BlockId, block::Header};
use primitives::{Hash, block::Id as BlockId, block::Header};
use message::{self, Message};
use runtime_support::Hashable;
// TODO: Add additional spam/DoS attack protection.
const MESSAGE_LIFETIME_SECONDS: u64 = 600;
const MESSAGE_LIFETIME: Duration = Duration::from_secs(600);
struct CandidateRequest {
id: message::RequestId,
@@ -48,12 +48,11 @@ pub struct Consensus {
bft_message_sink: Option<(mpsc::UnboundedSender<message::LocalizedBftMessage>, Hash)>,
messages: Vec<(Hash, Instant, message::Message)>,
message_hashes: HashSet<Hash>,
last_block_hash: HeaderHash,
}
impl Consensus {
/// Create a new instance.
pub fn new(best_hash: HeaderHash) -> Consensus {
pub fn new() -> Consensus {
Consensus {
peers: HashMap::new(),
our_candidate: None,
@@ -61,7 +60,6 @@ impl Consensus {
bft_message_sink: None,
messages: Default::default(),
message_hashes: Default::default(),
last_block_hash: best_hash,
}
}
@@ -283,37 +281,25 @@ impl Consensus {
self.peers.remove(&peer_id);
}
pub fn collect_garbage(&mut self, best_hash_and_header: Option<(HeaderHash, &Header)>) {
pub fn collect_garbage(&mut self, best_header: Option<&Header>) {
let hashes = &mut self.message_hashes;
let last_block_hash = &mut self.last_block_hash;
let before = self.messages.len();
let (best_hash, best_header) = best_hash_and_header.map(|(h, header)| (Some(h), Some(header))).unwrap_or((None, None));
if best_header.as_ref().map_or(false, |header| header.parent_hash != *last_block_hash) {
trace!(target:"sync", "Clearing conensus message cache");
self.messages.clear();
hashes.clear();
} else {
let expiration = Duration::from_secs(MESSAGE_LIFETIME_SECONDS);
let now = Instant::now();
if let Some(hash) = best_hash {
*last_block_hash = hash;
}
self.messages.retain(|&(ref hash, timestamp, ref message)| {
timestamp < now + expiration ||
best_header.map_or(true, |header| {
if match *message {
Message::BftMessage(ref msg) => msg.parent_hash != header.parent_hash,
Message::Statement(ref msg) => msg.parent_hash != header.parent_hash,
_ => true,
} {
hashes.remove(hash);
true
} else {
false
}
let now = Instant::now();
self.messages.retain(|&(ref hash, timestamp, ref message)| {
if timestamp >= now - MESSAGE_LIFETIME &&
best_header.map_or(true, |header|
match *message {
Message::BftMessage(ref msg) => msg.parent_hash != header.parent_hash,
Message::Statement(ref msg) => msg.parent_hash != header.parent_hash,
_ => true,
})
});
}
{
true
} else {
hashes.remove(hash);
false
}
});
if self.messages.len() != before {
trace!(target:"sync", "Cleaned up {} stale messages", before - self.messages.len());
}
@@ -322,3 +308,68 @@ impl Consensus {
}
}
}
#[cfg(test)]
mod tests {
use primitives::Hash;
use primitives::bft::Justification;
use primitives::block::{HeaderHash, Header};
use std::time::Instant;
use message::{self, Message};
use super::{Consensus, MESSAGE_LIFETIME};
#[test]
fn collects_garbage() {
let prev_hash = HeaderHash::random();
let best_hash = HeaderHash::random();
let mut consensus = Consensus::new();
let now = Instant::now();
let m1_hash = Hash::random();
let m2_hash = Hash::random();
let m1 = Message::BftMessage(message::LocalizedBftMessage {
parent_hash: prev_hash,
message: message::BftMessage::Auxiliary(Justification {
round_number: 0,
hash: Default::default(),
signatures: Default::default(),
}),
});
let m2 = Message::BftMessage(message::LocalizedBftMessage {
parent_hash: best_hash,
message: message::BftMessage::Auxiliary(Justification {
round_number: 0,
hash: Default::default(),
signatures: Default::default(),
}),
});
consensus.messages.push((m1_hash, now, m1));
consensus.messages.push((m2_hash, now, m2.clone()));
consensus.message_hashes.insert(m1_hash);
consensus.message_hashes.insert(m2_hash);
// nothing to collect
consensus.collect_garbage(None);
assert_eq!(consensus.messages.len(), 2);
assert_eq!(consensus.message_hashes.len(), 2);
// random header, nothing should be cleared
let mut header = Header::from_block_number(0);
consensus.collect_garbage(Some(&header));
assert_eq!(consensus.messages.len(), 2);
assert_eq!(consensus.message_hashes.len(), 2);
// header that matches one of the messages
header.parent_hash = prev_hash;
consensus.collect_garbage(Some(&header));
assert_eq!(consensus.messages.len(), 1);
assert_eq!(consensus.message_hashes.len(), 1);
assert!(consensus.message_hashes.contains(&m2_hash));
// make timestamp expired
consensus.messages.clear();
consensus.messages.push((m2_hash, now - MESSAGE_LIFETIME, m2));
consensus.collect_garbage(None);
assert!(consensus.messages.is_empty());
assert!(consensus.message_hashes.is_empty());
}
}
+2 -3
View File
@@ -116,7 +116,6 @@ impl Protocol {
/// Create a new instance.
pub fn new(config: ProtocolConfig, chain: Arc<Client>, on_demand: Option<Arc<OnDemandService>>, transaction_pool: Arc<TransactionPool>) -> error::Result<Protocol> {
let info = chain.info()?;
let best_hash = info.chain.best_hash;
let sync = ChainSync::new(config.roles, &info);
let protocol = Protocol {
config: config,
@@ -124,7 +123,7 @@ impl Protocol {
on_demand: on_demand,
genesis_hash: info.chain.genesis_hash,
sync: RwLock::new(sync),
consensus: Mutex::new(Consensus::new(best_hash)),
consensus: Mutex::new(Consensus::new()),
peers: RwLock::new(HashMap::new()),
handshaking_peers: RwLock::new(HashMap::new()),
transaction_pool: transaction_pool,
@@ -528,7 +527,7 @@ impl Protocol {
}
}
self.consensus.lock().collect_garbage(Some((hash, &header)));
self.consensus.lock().collect_garbage(Some(&header));
}
fn on_remote_call_request(&self, io: &mut SyncIo, peer_id: PeerId, request: message::RemoteCallRequest) {