mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-29 18:27:56 +00:00
Ensure all known BFT messages are imported when starting consensus (#147)
* a little more BFT tracing * import cached BFT messages into the produced stream
This commit is contained in:
committed by
GitHub
parent
a0b9c1147f
commit
7d54678331
@@ -22,7 +22,6 @@
|
||||
use std::thread;
|
||||
use std::time::{Duration, Instant};
|
||||
use std::sync::Arc;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use futures::{future, Future, Stream, Sink, Async, Canceled, Poll};
|
||||
use parking_lot::Mutex;
|
||||
use substrate_network as net;
|
||||
@@ -41,7 +40,6 @@ use error;
|
||||
|
||||
const TIMER_DELAY_MS: u64 = 5000;
|
||||
const TIMER_INTERVAL_MS: u64 = 500;
|
||||
const MESSAGE_LIFETIME_SEC: u64 = 10;
|
||||
|
||||
struct BftSink<E> {
|
||||
network: Arc<net::ConsensusService>,
|
||||
@@ -49,49 +47,9 @@ struct BftSink<E> {
|
||||
_e: ::std::marker::PhantomData<E>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct SharedMessageCollection {
|
||||
/// Messages for consensus over a block with known hash. Also holds timestamp of the first message.
|
||||
messages: Arc<Mutex<HashMap<HeaderHash, (Instant, VecDeque<net::LocalizedBftMessage>)>>>,
|
||||
}
|
||||
|
||||
impl SharedMessageCollection {
|
||||
fn new() -> SharedMessageCollection {
|
||||
SharedMessageCollection {
|
||||
messages: Arc::new(Mutex::new(HashMap::new())),
|
||||
}
|
||||
}
|
||||
|
||||
fn select(&self, parent_hash: HeaderHash, stream: net::BftMessageStream, authorities: Vec<AuthorityId>) -> Messages {
|
||||
Messages {
|
||||
messages: self.messages.lock().remove(&parent_hash).map(|(_, m)| m).unwrap_or_else(VecDeque::new),
|
||||
parent_hash,
|
||||
network_stream: stream,
|
||||
authorities: authorities,
|
||||
collection: self.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
fn push(&self, message: net::LocalizedBftMessage) {
|
||||
self.messages.lock()
|
||||
.entry(message.parent_hash)
|
||||
.or_insert_with(|| (Instant::now(), VecDeque::new()))
|
||||
.1.push_back(message);
|
||||
}
|
||||
|
||||
fn collect_garbage(&self) {
|
||||
let expiration = Duration::from_secs(MESSAGE_LIFETIME_SEC);
|
||||
let now = Instant::now();
|
||||
self.messages.lock().retain(|_, &mut (timestamp, _)| timestamp < now + expiration);
|
||||
}
|
||||
}
|
||||
|
||||
struct Messages {
|
||||
parent_hash: HeaderHash,
|
||||
messages: VecDeque<net::LocalizedBftMessage>,
|
||||
network_stream: net::BftMessageStream,
|
||||
authorities: Vec<AuthorityId>,
|
||||
collection: SharedMessageCollection,
|
||||
}
|
||||
|
||||
impl Stream for Messages {
|
||||
@@ -99,14 +57,6 @@ impl Stream for Messages {
|
||||
type Error = bft::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
// push buffered messages first
|
||||
while let Some(message) = self.messages.pop_front() {
|
||||
match process_message(message, &self.authorities) {
|
||||
Ok(message) => return Ok(Async::Ready(Some(message))),
|
||||
Err(e) => debug!("Message validation failed: {:?}", e),
|
||||
}
|
||||
}
|
||||
|
||||
// check the network
|
||||
loop {
|
||||
match self.network_stream.poll() {
|
||||
@@ -114,15 +64,11 @@ impl Stream for Messages {
|
||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||
Ok(Async::Ready(None)) => return Ok(Async::NotReady), // the input stream for agreements is never meant to logically end.
|
||||
Ok(Async::Ready(Some(message))) => {
|
||||
if message.parent_hash == self.parent_hash {
|
||||
match process_message(message, &self.authorities) {
|
||||
Ok(message) => return Ok(Async::Ready(Some(message))),
|
||||
Err(e) => {
|
||||
debug!("Message validation failed: {:?}", e);
|
||||
}
|
||||
match process_message(message, &self.authorities) {
|
||||
Ok(message) => return Ok(Async::Ready(Some(message))),
|
||||
Err(e) => {
|
||||
debug!("Message validation failed: {:?}", e);
|
||||
}
|
||||
} else {
|
||||
self.collection.push(message);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -226,18 +172,17 @@ fn start_bft<F, C>(
|
||||
client: &bft::Authorities,
|
||||
network: Arc<net::ConsensusService>,
|
||||
bft_service: &BftService<F, C>,
|
||||
messages: SharedMessageCollection
|
||||
) where
|
||||
F: bft::ProposerFactory + 'static,
|
||||
C: bft::BlockImport + bft::Authorities + 'static,
|
||||
<F as bft::ProposerFactory>::Error: ::std::fmt::Debug,
|
||||
<F::Proposer as bft::Proposer>::Error: ::std::fmt::Display + Into<error::Error>,
|
||||
{
|
||||
let hash = header.blake2_256().into();
|
||||
if bft_service.live_agreement().map_or(false, |h| h == hash) {
|
||||
let parent_hash = header.blake2_256().into();
|
||||
if bft_service.live_agreement().map_or(false, |h| h == parent_hash) {
|
||||
return;
|
||||
}
|
||||
let authorities = match client.authorities(&BlockId::Hash(hash)) {
|
||||
let authorities = match client.authorities(&BlockId::Hash(parent_hash)) {
|
||||
Ok(authorities) => authorities,
|
||||
Err(e) => {
|
||||
debug!("Error reading authorities: {:?}", e);
|
||||
@@ -245,12 +190,16 @@ fn start_bft<F, C>(
|
||||
}
|
||||
};
|
||||
|
||||
let input = messages.select(hash, network.bft_messages(), authorities).map_err(|e| e.into());
|
||||
let output = BftSink { network: network, parent_hash: hash.clone(), _e: Default::default() };
|
||||
match bft_service.build_upon(&header, input, output) {
|
||||
let input = Messages {
|
||||
network_stream: network.bft_messages(parent_hash),
|
||||
authorities,
|
||||
};
|
||||
|
||||
let output = BftSink { network: network, parent_hash: parent_hash, _e: Default::default() };
|
||||
match bft_service.build_upon(&header, input.map_err(Into::into), output) {
|
||||
Ok(Some(bft)) => handle.spawn(bft),
|
||||
Ok(None) => {},
|
||||
Err(e) => debug!(target: "bft","BFT agreement error: {:?}", e),
|
||||
Err(e) => debug!(target: "bft", "BFT agreement error: {:?}", e),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -281,7 +230,6 @@ impl Service {
|
||||
network: Network(network.clone()),
|
||||
handle: core.handle(),
|
||||
};
|
||||
let messages = SharedMessageCollection::new();
|
||||
let bft_service = Arc::new(BftService::new(client.clone(), key, factory));
|
||||
|
||||
let notifications = {
|
||||
@@ -289,11 +237,10 @@ impl Service {
|
||||
let network = network.clone();
|
||||
let client = client.clone();
|
||||
let bft_service = bft_service.clone();
|
||||
let messages = messages.clone();
|
||||
|
||||
client.import_notification_stream().for_each(move |notification| {
|
||||
if notification.is_new_best {
|
||||
start_bft(¬ification.header, handle.clone(), &*client, network.clone(), &*bft_service, messages.clone());
|
||||
start_bft(¬ification.header, handle.clone(), &*client, network.clone(), &*bft_service);
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
@@ -316,16 +263,14 @@ impl Service {
|
||||
let c = client.clone();
|
||||
let s = bft_service.clone();
|
||||
let n = network.clone();
|
||||
let m = messages.clone();
|
||||
let handle = core.handle();
|
||||
|
||||
interval.map_err(|e| debug!("Timer error: {:?}", e)).for_each(move |_| {
|
||||
if let Ok(best_block) = c.best_block_header() {
|
||||
let hash = best_block.blake2_256();
|
||||
m.collect_garbage();
|
||||
if hash == prev_best {
|
||||
debug!("Starting consensus round after a timeout");
|
||||
start_bft(&best_block, handle.clone(), &*c, n.clone(), &*s, m.clone());
|
||||
start_bft(&best_block, handle.clone(), &*c, n.clone(), &*s);
|
||||
}
|
||||
prev_best = hash;
|
||||
}
|
||||
|
||||
@@ -339,6 +339,7 @@ impl<Candidate, Digest, AuthorityId, Signature> Accumulator<Candidate, Digest, A
|
||||
count.committed += 1;
|
||||
|
||||
if count.committed >= self.threshold {
|
||||
trace!(target: "bft", "observed threshold-commit for round {} with {} commits", self.round_number, count.committed);
|
||||
Some(digest)
|
||||
} else {
|
||||
None
|
||||
|
||||
@@ -348,6 +348,7 @@ impl<P, I> BftService<P, I>
|
||||
|
||||
let n = authorities.len();
|
||||
let max_faulty = max_faulty_of(n);
|
||||
trace!(target: "bft", "max_faulty_of({})={}", n, max_faulty);
|
||||
|
||||
let local_id = self.key.public().0;
|
||||
|
||||
|
||||
@@ -46,7 +46,7 @@ pub struct Consensus {
|
||||
peers: HashMap<PeerId, PeerConsensus>,
|
||||
our_candidate: Option<(Hash, Vec<u8>)>,
|
||||
statement_sink: Option<mpsc::UnboundedSender<message::Statement>>,
|
||||
bft_message_sink: Option<mpsc::UnboundedSender<message::LocalizedBftMessage>>,
|
||||
bft_message_sink: Option<(mpsc::UnboundedSender<message::LocalizedBftMessage>, Hash)>,
|
||||
messages: HashMap<Hash, (Instant, message::Message)>,
|
||||
}
|
||||
|
||||
@@ -143,26 +143,41 @@ impl Consensus {
|
||||
if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
|
||||
peer.known_messages.insert(hash);
|
||||
// TODO: validate signature?
|
||||
if let Some(sink) = self.bft_message_sink.take() {
|
||||
if let Err(e) = sink.unbounded_send(message.clone()) {
|
||||
trace!(target:"sync", "Error broadcasting BFT message notification: {:?}", e);
|
||||
} else {
|
||||
self.bft_message_sink = Some(sink);
|
||||
if let Some((sink, parent_hash)) = self.bft_message_sink.take() {
|
||||
if message.parent_hash == parent_hash {
|
||||
if let Err(e) = sink.unbounded_send(message.clone()) {
|
||||
trace!(target:"sync", "Error broadcasting BFT message notification: {:?}", e);
|
||||
} else {
|
||||
self.bft_message_sink = Some((sink, parent_hash));
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
trace!(target:"sync", "Ignored BFT statement from unregistered peer {}", peer_id);
|
||||
return;
|
||||
}
|
||||
|
||||
let message = Message::BftMessage(message);
|
||||
self.register_message(hash.clone(), message.clone());
|
||||
// Propagate to other peers.
|
||||
self.propagate(io, protocol, message, hash);
|
||||
}
|
||||
|
||||
pub fn bft_messages(&mut self) -> mpsc::UnboundedReceiver<message::LocalizedBftMessage>{
|
||||
pub fn bft_messages(&mut self, parent_hash: Hash) -> mpsc::UnboundedReceiver<message::LocalizedBftMessage>{
|
||||
let (sink, stream) = mpsc::unbounded();
|
||||
self.bft_message_sink = Some(sink);
|
||||
|
||||
for (_, message) in self.messages.iter() {
|
||||
let bft_message = match *message {
|
||||
(_, Message::BftMessage(ref msg)) => msg,
|
||||
_ => continue,
|
||||
};
|
||||
|
||||
if bft_message.parent_hash == parent_hash {
|
||||
sink.unbounded_send(bft_message.clone()).expect("receiving end known to be open; qed");
|
||||
}
|
||||
}
|
||||
|
||||
self.bft_message_sink = Some((sink, parent_hash));
|
||||
stream
|
||||
}
|
||||
|
||||
|
||||
@@ -317,8 +317,8 @@ impl Protocol {
|
||||
}
|
||||
|
||||
/// See `ConsensusService` trait.
|
||||
pub fn bft_messages(&self) -> BftMessageStream {
|
||||
self.consensus.lock().bft_messages()
|
||||
pub fn bft_messages(&self, parent_hash: Hash) -> BftMessageStream {
|
||||
self.consensus.lock().bft_messages(parent_hash)
|
||||
}
|
||||
|
||||
/// See `ConsensusService` trait.
|
||||
|
||||
@@ -91,8 +91,9 @@ pub trait ConsensusService: Send + Sync {
|
||||
/// Pass `None` to clear the candidate.
|
||||
fn set_local_candidate(&self, candidate: Option<(Hash, Vec<u8>)>);
|
||||
|
||||
/// Get BFT message stream.
|
||||
fn bft_messages(&self) -> BftMessageStream;
|
||||
/// Get BFT message stream for messages corresponding to consensus on given
|
||||
/// parent hash.
|
||||
fn bft_messages(&self, parent_hash: Hash) -> BftMessageStream;
|
||||
/// Send out a BFT message.
|
||||
fn send_bft_message(&self, message: LocalizedBftMessage);
|
||||
}
|
||||
@@ -254,8 +255,8 @@ impl ConsensusService for Service {
|
||||
self.handler.protocol.set_local_candidate(candidate)
|
||||
}
|
||||
|
||||
fn bft_messages(&self) -> BftMessageStream {
|
||||
self.handler.protocol.bft_messages()
|
||||
fn bft_messages(&self, parent_hash: Hash) -> BftMessageStream {
|
||||
self.handler.protocol.bft_messages(parent_hash)
|
||||
}
|
||||
|
||||
fn send_bft_message(&self, message: LocalizedBftMessage) {
|
||||
|
||||
@@ -0,0 +1,49 @@
|
||||
// Copyright 2017 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Polkadot.
|
||||
|
||||
// Polkadot is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Polkadot is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use super::*;
|
||||
use message::*;
|
||||
use futures::Stream;
|
||||
|
||||
#[test]
|
||||
fn bft_messages_include_those_sent_before_asking_for_stream() {
|
||||
let mut config = ::config::ProtocolConfig::default();
|
||||
config.roles = ::service::Role::VALIDATOR | ::service::Role::FULL;
|
||||
|
||||
let mut net = TestNet::new_with_config(2, config);
|
||||
net.sync(); // necessary for handshaking
|
||||
|
||||
let peer = net.peer(0);
|
||||
let mut io = TestIo::new(&peer.queue, None);
|
||||
let bft_message = BftMessage::Consensus(SignedConsensusMessage::Vote(SignedConsensusVote {
|
||||
vote: ConsensusVote::AdvanceRound(0),
|
||||
sender: [0; 32],
|
||||
signature: Default::default(),
|
||||
}));
|
||||
|
||||
let localized = LocalizedBftMessage {
|
||||
message: bft_message,
|
||||
parent_hash: [1; 32].into(),
|
||||
};
|
||||
|
||||
|
||||
let as_bytes = ::serde_json::to_vec(&Message::BftMessage(localized.clone())).unwrap();
|
||||
peer.sync.handle_packet(&mut io, 1, &as_bytes[..]);
|
||||
|
||||
let stream = peer.sync.bft_messages([1; 32].into());
|
||||
|
||||
assert_eq!(stream.wait().next(), Some(Ok(localized)));
|
||||
}
|
||||
@@ -14,6 +14,7 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
mod consensus;
|
||||
mod sync;
|
||||
|
||||
use std::collections::{VecDeque, HashSet, HashMap};
|
||||
|
||||
Reference in New Issue
Block a user