Networking and backend fixes (#155)

* Networking and backend fixes

* Fixed test

* Reverted DB fix

* Reverted DB fix properly

* Preserve messages with unknown parent_hash
This commit is contained in:
Arkadiy Paronyan
2018-05-14 12:48:14 +02:00
committed by Robert Habermeier
parent 8068fdd2af
commit 8bfded8d72
7 changed files with 88 additions and 31 deletions
+1 -1
View File
@@ -125,7 +125,7 @@ impl BlockCollection {
};
// crop to peers best
if range.start >= peer_best {
if range.start > peer_best {
trace!(target: "sync", "Out of range for peer {} ({} vs {})", peer_id, range.start, peer_best);
return None;
}
+60 -18
View File
@@ -19,11 +19,10 @@
use std::collections::{HashMap, HashSet};
use futures::sync::{oneshot, mpsc};
use std::time::{Instant, Duration};
use std::collections::hash_map::Entry;
use io::SyncIo;
use protocol::Protocol;
use network::PeerId;
use primitives::Hash;
use primitives::{Hash, block::HeaderHash, block::Id as BlockId, block::Header};
use message::{self, Message};
use runtime_support::Hashable;
@@ -47,18 +46,22 @@ pub struct Consensus {
our_candidate: Option<(Hash, Vec<u8>)>,
statement_sink: Option<mpsc::UnboundedSender<message::Statement>>,
bft_message_sink: Option<(mpsc::UnboundedSender<message::LocalizedBftMessage>, Hash)>,
messages: HashMap<Hash, (Instant, message::Message)>,
messages: Vec<(Hash, Instant, message::Message)>,
message_hashes: HashSet<Hash>,
last_block_hash: HeaderHash,
}
impl Consensus {
/// Create a new instance.
pub fn new() -> Consensus {
pub fn new(best_hash: HeaderHash) -> Consensus {
Consensus {
peers: HashMap::new(),
our_candidate: None,
statement_sink: None,
bft_message_sink: None,
messages: Default::default(),
message_hashes: Default::default(),
last_block_hash: best_hash,
}
}
@@ -75,9 +78,9 @@ impl Consensus {
// Send out all known messages.
// TODO: limit by size
let mut known_messages = HashSet::new();
for (hash, &(_, ref m)) in self.messages.iter() {
for &(ref hash, _, ref message) in self.messages.iter() {
known_messages.insert(hash.clone());
protocol.send_message(io, peer_id, m.clone());
protocol.send_message(io, peer_id, message.clone());
}
self.peers.insert(peer_id, PeerConsensus {
candidate_fetch: None,
@@ -96,13 +99,13 @@ impl Consensus {
}
fn register_message(&mut self, hash: Hash, message: message::Message) {
if let Entry::Vacant(entry) = self.messages.entry(hash) {
entry.insert((Instant::now(), message));
if self.message_hashes.insert(hash) {
self.messages.push((hash, Instant::now(), message));
}
}
pub fn on_statement(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId, statement: message::Statement, hash: Hash) {
if self.messages.contains_key(&hash) {
if self.message_hashes.contains(&hash) {
trace!(target:"sync", "Ignored already known statement from {}", peer_id);
}
if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
@@ -137,11 +140,25 @@ impl Consensus {
}
pub fn on_bft_message(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId, message: message::LocalizedBftMessage, hash: Hash) {
if self.messages.contains_key(&hash) {
if self.message_hashes.contains(&hash) {
trace!(target:"sync", "Ignored already known BFT message from {}", peer_id);
return;
}
match (protocol.chain().info(), protocol.chain().header(&BlockId::Hash(message.parent_hash))) {
(_, Err(e)) | (Err(e), _) => {
debug!(target:"sync", "Error reading blockchain: {:?}", e);
return;
},
(Ok(info), Ok(Some(header))) => {
if header.number < info.chain.best_number {
trace!(target:"sync", "Ignored ancient BFT message from {}, hash={}", peer_id, message.parent_hash);
return;
}
},
(Ok(_), Ok(None)) => {},
}
if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
peer.known_messages.insert(hash);
// TODO: validate signature?
@@ -168,9 +185,9 @@ impl Consensus {
pub fn bft_messages(&mut self, parent_hash: Hash) -> mpsc::UnboundedReceiver<message::LocalizedBftMessage>{
let (sink, stream) = mpsc::unbounded();
for (_, message) in self.messages.iter() {
for &(_, _, ref message) in self.messages.iter() {
let bft_message = match *message {
(_, Message::BftMessage(ref msg)) => msg,
Message::BftMessage(ref msg) => msg,
_ => continue,
};
@@ -266,17 +283,42 @@ impl Consensus {
self.peers.remove(&peer_id);
}
pub fn collect_garbage(&mut self) {
let expiration = Duration::from_secs(MESSAGE_LIFETIME_SECONDS);
let now = Instant::now();
pub fn collect_garbage(&mut self, best_hash_and_header: Option<(HeaderHash, &Header)>) {
let hashes = &mut self.message_hashes;
let last_block_hash = &mut self.last_block_hash;
let before = self.messages.len();
self.messages.retain(|_, &mut (timestamp, _)| timestamp < now + expiration);
let (best_hash, best_header) = best_hash_and_header.map(|(h, header)| (Some(h), Some(header))).unwrap_or((None, None));
if best_header.as_ref().map_or(false, |header| header.parent_hash != *last_block_hash) {
trace!(target:"sync", "Clearing conensus message cache");
self.messages.clear();
hashes.clear();
} else {
let expiration = Duration::from_secs(MESSAGE_LIFETIME_SECONDS);
let now = Instant::now();
if let Some(hash) = best_hash {
*last_block_hash = hash;
}
self.messages.retain(|&(ref hash, timestamp, ref message)| {
timestamp < now + expiration ||
best_header.map_or(true, |header| {
if match *message {
Message::BftMessage(ref msg) => msg.parent_hash != header.parent_hash,
Message::Statement(ref msg) => msg.parent_hash != header.parent_hash,
_ => true,
} {
hashes.remove(hash);
true
} else {
false
}
})
});
}
if self.messages.len() != before {
trace!(target:"sync", "Cleaned up {} stale messages", before - self.messages.len());
}
let messages = &self.messages;
for (_, ref mut peer) in self.peers.iter_mut() {
peer.known_messages.retain(|h| messages.contains_key(h));
peer.known_messages.retain(|h| hashes.contains(h));
}
}
}
@@ -225,6 +225,7 @@ pub enum SignedConsensusMessage {
/// A vote.
Vote(SignedConsensusVote),
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
/// A network message.
pub enum Message {
+10 -5
View File
@@ -36,7 +36,7 @@ use io::SyncIo;
use error;
use super::header_hash;
const REQUEST_TIMEOUT_SEC: u64 = 15;
const REQUEST_TIMEOUT_SEC: u64 = 40;
const PROTOCOL_VERSION: u32 = 0;
// Maximum allowed entries in `BlockResponse`
@@ -114,12 +114,13 @@ impl Protocol {
/// Create a new instance.
pub fn new(config: ProtocolConfig, chain: Arc<Client>, transaction_pool: Arc<TransactionPool>) -> error::Result<Protocol> {
let info = chain.info()?;
let best_hash = info.chain.best_hash;
let protocol = Protocol {
config: config,
chain: chain,
genesis_hash: info.chain.genesis_hash,
sync: RwLock::new(ChainSync::new(&info)),
consensus: Mutex::new(Consensus::new()),
consensus: Mutex::new(Consensus::new(best_hash)),
peers: RwLock::new(HashMap::new()),
handshaking_peers: RwLock::new(HashMap::new()),
transaction_pool: transaction_pool,
@@ -344,7 +345,7 @@ impl Protocol {
/// Perform time based maintenance.
pub fn tick(&self, io: &mut SyncIo) {
self.maintain_peers(io);
self.consensus.lock().collect_garbage();
self.consensus.lock().collect_garbage(None);
}
fn maintain_peers(&self, io: &mut SyncIo) {
@@ -387,6 +388,8 @@ impl Protocol {
return;
}
let mut sync = self.sync.write();
let mut consensus = self.consensus.lock();
{
let mut peers = self.peers.write();
let mut handshaking_peers = self.handshaking_peers.write();
@@ -420,8 +423,8 @@ impl Protocol {
handshaking_peers.remove(&peer_id);
debug!(target: "sync", "Connected {} {}", peer_id, io.peer_info(peer_id));
}
self.sync.write().new_peer(io, self, peer_id);
self.consensus.lock().new_peer(io, self, peer_id, &status.roles);
sync.new_peer(io, self, peer_id);
consensus.new_peer(io, self, peer_id, &status.roles);
}
/// Called when peer sends us new transactions
@@ -511,6 +514,8 @@ impl Protocol {
}));
}
}
self.consensus.lock().collect_garbage(Some((hash, &header)));
}
pub fn transactions_stats(&self) -> BTreeMap<ExtrinsicHash, TransactionStats> {
+6 -4
View File
@@ -114,13 +114,13 @@ impl ChainSync {
io.disable_peer(peer_id);
},
(Ok(BlockStatus::Unknown), 0) => {
debug!(target:"sync", "New peer with unkown genesis hash {} ({}).", info.best_hash, info.best_number);
debug!(target:"sync", "New peer with unknown genesis hash {} ({}).", info.best_hash, info.best_number);
io.disable_peer(peer_id);
},
(Ok(BlockStatus::Unknown), _) => {
let our_best = self.best_queued_number;
if our_best > 0 {
debug!(target:"sync", "New peer with unkown best hash {} ({}), searching for common ancestor.", info.best_hash, info.best_number);
debug!(target:"sync", "New peer with unknown best hash {} ({}), searching for common ancestor.", info.best_hash, info.best_number);
self.peers.insert(peer_id, PeerSync {
common_hash: self.genesis_hash,
common_number: 0,
@@ -187,13 +187,15 @@ impl ChainSync {
trace!(target:"sync", "Found common ancestor for peer {}: {} ({})", peer_id, block.hash, n);
vec![]
},
Ok(_) if n > 0 => {
Ok(our_best) if n > 0 => {
trace!(target:"sync", "Ancestry block mismatch for peer {}: theirs: {} ({}), ours: {:?}", peer_id, block.hash, n, our_best);
let n = n - 1;
peer.state = PeerSyncState::AncestorSearch(n);
Self::request_ancestry(io, protocol, peer_id, n);
return;
},
Ok(_) => { // genesis mismatch
trace!(target:"sync", "Ancestry search: genesis mismatch for peer {}", peer_id);
io.disable_peer(peer_id);
return;
},
@@ -326,7 +328,7 @@ impl ChainSync {
let stale = header.number <= self.best_queued_number;
if stale {
if !self.is_known_or_already_downloading(protocol, &header.parent_hash) {
trace!(target: "sync", "Ignoring unkown stale block announce from {}: {} {:?}", peer_id, hash, header);
trace!(target: "sync", "Ignoring unknown stale block announce from {}: {} {:?}", peer_id, hash, header);
} else {
trace!(target: "sync", "Downloading new stale block announced from {}: {} {:?}", peer_id, hash, header);
self.download_stale(io, protocol, peer_id, &hash);
@@ -34,16 +34,18 @@ fn bft_messages_include_those_sent_before_asking_for_stream() {
signature: Default::default(),
}));
let parent_hash = peer.genesis_hash();
let localized = LocalizedBftMessage {
message: bft_message,
parent_hash: [1; 32].into(),
parent_hash: parent_hash,
};
let as_bytes = ::serde_json::to_vec(&Message::BftMessage(localized.clone())).unwrap();
peer.sync.handle_packet(&mut io, 1, &as_bytes[..]);
let stream = peer.sync.bft_messages([1; 32].into());
let stream = peer.sync.bft_messages(parent_hash);
assert_eq!(stream.wait().next(), Some(Ok(localized)));
}
+6 -1
View File
@@ -23,7 +23,7 @@ use std::sync::Arc;
use parking_lot::RwLock;
use client;
use client::block_builder::BlockBuilder;
use primitives::block::{Id as BlockId, ExtrinsicHash};
use primitives::block::{Id as BlockId, ExtrinsicHash, HeaderHash};
use primitives;
use io::SyncIo;
use protocol::Protocol;
@@ -187,6 +187,11 @@ impl Peer {
self.generate_blocks(count, |_| ());
}
}
pub fn genesis_hash(&self) -> HeaderHash {
let info = self.client.info().expect("In-mem client does not fail");
info.chain.genesis_hash
}
}
struct EmptyTransactionPool;