mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-30 16:21:02 +00:00
Ensure all known BFT messages are imported when starting consensus (#147)
* a little more BFT tracing * import cached BFT messages into the produced stream
This commit is contained in:
committed by
GitHub
parent
f9d5a6a09f
commit
ab946b2b9f
@@ -22,7 +22,6 @@
|
|||||||
use std::thread;
|
use std::thread;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::collections::{HashMap, VecDeque};
|
|
||||||
use futures::{future, Future, Stream, Sink, Async, Canceled, Poll};
|
use futures::{future, Future, Stream, Sink, Async, Canceled, Poll};
|
||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
use substrate_network as net;
|
use substrate_network as net;
|
||||||
@@ -41,7 +40,6 @@ use error;
|
|||||||
|
|
||||||
const TIMER_DELAY_MS: u64 = 5000;
|
const TIMER_DELAY_MS: u64 = 5000;
|
||||||
const TIMER_INTERVAL_MS: u64 = 500;
|
const TIMER_INTERVAL_MS: u64 = 500;
|
||||||
const MESSAGE_LIFETIME_SEC: u64 = 10;
|
|
||||||
|
|
||||||
struct BftSink<E> {
|
struct BftSink<E> {
|
||||||
network: Arc<net::ConsensusService>,
|
network: Arc<net::ConsensusService>,
|
||||||
@@ -49,49 +47,9 @@ struct BftSink<E> {
|
|||||||
_e: ::std::marker::PhantomData<E>,
|
_e: ::std::marker::PhantomData<E>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
struct SharedMessageCollection {
|
|
||||||
/// Messages for consensus over a block with known hash. Also holds timestamp of the first message.
|
|
||||||
messages: Arc<Mutex<HashMap<HeaderHash, (Instant, VecDeque<net::LocalizedBftMessage>)>>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SharedMessageCollection {
|
|
||||||
fn new() -> SharedMessageCollection {
|
|
||||||
SharedMessageCollection {
|
|
||||||
messages: Arc::new(Mutex::new(HashMap::new())),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn select(&self, parent_hash: HeaderHash, stream: net::BftMessageStream, authorities: Vec<AuthorityId>) -> Messages {
|
|
||||||
Messages {
|
|
||||||
messages: self.messages.lock().remove(&parent_hash).map(|(_, m)| m).unwrap_or_else(VecDeque::new),
|
|
||||||
parent_hash,
|
|
||||||
network_stream: stream,
|
|
||||||
authorities: authorities,
|
|
||||||
collection: self.clone(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn push(&self, message: net::LocalizedBftMessage) {
|
|
||||||
self.messages.lock()
|
|
||||||
.entry(message.parent_hash)
|
|
||||||
.or_insert_with(|| (Instant::now(), VecDeque::new()))
|
|
||||||
.1.push_back(message);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn collect_garbage(&self) {
|
|
||||||
let expiration = Duration::from_secs(MESSAGE_LIFETIME_SEC);
|
|
||||||
let now = Instant::now();
|
|
||||||
self.messages.lock().retain(|_, &mut (timestamp, _)| timestamp < now + expiration);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct Messages {
|
struct Messages {
|
||||||
parent_hash: HeaderHash,
|
|
||||||
messages: VecDeque<net::LocalizedBftMessage>,
|
|
||||||
network_stream: net::BftMessageStream,
|
network_stream: net::BftMessageStream,
|
||||||
authorities: Vec<AuthorityId>,
|
authorities: Vec<AuthorityId>,
|
||||||
collection: SharedMessageCollection,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Stream for Messages {
|
impl Stream for Messages {
|
||||||
@@ -99,14 +57,6 @@ impl Stream for Messages {
|
|||||||
type Error = bft::Error;
|
type Error = bft::Error;
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||||
// push buffered messages first
|
|
||||||
while let Some(message) = self.messages.pop_front() {
|
|
||||||
match process_message(message, &self.authorities) {
|
|
||||||
Ok(message) => return Ok(Async::Ready(Some(message))),
|
|
||||||
Err(e) => debug!("Message validation failed: {:?}", e),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// check the network
|
// check the network
|
||||||
loop {
|
loop {
|
||||||
match self.network_stream.poll() {
|
match self.network_stream.poll() {
|
||||||
@@ -114,16 +64,12 @@ impl Stream for Messages {
|
|||||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||||
Ok(Async::Ready(None)) => return Ok(Async::NotReady), // the input stream for agreements is never meant to logically end.
|
Ok(Async::Ready(None)) => return Ok(Async::NotReady), // the input stream for agreements is never meant to logically end.
|
||||||
Ok(Async::Ready(Some(message))) => {
|
Ok(Async::Ready(Some(message))) => {
|
||||||
if message.parent_hash == self.parent_hash {
|
|
||||||
match process_message(message, &self.authorities) {
|
match process_message(message, &self.authorities) {
|
||||||
Ok(message) => return Ok(Async::Ready(Some(message))),
|
Ok(message) => return Ok(Async::Ready(Some(message))),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
debug!("Message validation failed: {:?}", e);
|
debug!("Message validation failed: {:?}", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
self.collection.push(message);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -226,18 +172,17 @@ fn start_bft<F, C>(
|
|||||||
client: &bft::Authorities,
|
client: &bft::Authorities,
|
||||||
network: Arc<net::ConsensusService>,
|
network: Arc<net::ConsensusService>,
|
||||||
bft_service: &BftService<F, C>,
|
bft_service: &BftService<F, C>,
|
||||||
messages: SharedMessageCollection
|
|
||||||
) where
|
) where
|
||||||
F: bft::ProposerFactory + 'static,
|
F: bft::ProposerFactory + 'static,
|
||||||
C: bft::BlockImport + bft::Authorities + 'static,
|
C: bft::BlockImport + bft::Authorities + 'static,
|
||||||
<F as bft::ProposerFactory>::Error: ::std::fmt::Debug,
|
<F as bft::ProposerFactory>::Error: ::std::fmt::Debug,
|
||||||
<F::Proposer as bft::Proposer>::Error: ::std::fmt::Display + Into<error::Error>,
|
<F::Proposer as bft::Proposer>::Error: ::std::fmt::Display + Into<error::Error>,
|
||||||
{
|
{
|
||||||
let hash = header.blake2_256().into();
|
let parent_hash = header.blake2_256().into();
|
||||||
if bft_service.live_agreement().map_or(false, |h| h == hash) {
|
if bft_service.live_agreement().map_or(false, |h| h == parent_hash) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
let authorities = match client.authorities(&BlockId::Hash(hash)) {
|
let authorities = match client.authorities(&BlockId::Hash(parent_hash)) {
|
||||||
Ok(authorities) => authorities,
|
Ok(authorities) => authorities,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
debug!("Error reading authorities: {:?}", e);
|
debug!("Error reading authorities: {:?}", e);
|
||||||
@@ -245,9 +190,13 @@ fn start_bft<F, C>(
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let input = messages.select(hash, network.bft_messages(), authorities).map_err(|e| e.into());
|
let input = Messages {
|
||||||
let output = BftSink { network: network, parent_hash: hash.clone(), _e: Default::default() };
|
network_stream: network.bft_messages(parent_hash),
|
||||||
match bft_service.build_upon(&header, input, output) {
|
authorities,
|
||||||
|
};
|
||||||
|
|
||||||
|
let output = BftSink { network: network, parent_hash: parent_hash, _e: Default::default() };
|
||||||
|
match bft_service.build_upon(&header, input.map_err(Into::into), output) {
|
||||||
Ok(Some(bft)) => handle.spawn(bft),
|
Ok(Some(bft)) => handle.spawn(bft),
|
||||||
Ok(None) => {},
|
Ok(None) => {},
|
||||||
Err(e) => debug!(target: "bft", "BFT agreement error: {:?}", e),
|
Err(e) => debug!(target: "bft", "BFT agreement error: {:?}", e),
|
||||||
@@ -281,7 +230,6 @@ impl Service {
|
|||||||
network: Network(network.clone()),
|
network: Network(network.clone()),
|
||||||
handle: core.handle(),
|
handle: core.handle(),
|
||||||
};
|
};
|
||||||
let messages = SharedMessageCollection::new();
|
|
||||||
let bft_service = Arc::new(BftService::new(client.clone(), key, factory));
|
let bft_service = Arc::new(BftService::new(client.clone(), key, factory));
|
||||||
|
|
||||||
let notifications = {
|
let notifications = {
|
||||||
@@ -289,11 +237,10 @@ impl Service {
|
|||||||
let network = network.clone();
|
let network = network.clone();
|
||||||
let client = client.clone();
|
let client = client.clone();
|
||||||
let bft_service = bft_service.clone();
|
let bft_service = bft_service.clone();
|
||||||
let messages = messages.clone();
|
|
||||||
|
|
||||||
client.import_notification_stream().for_each(move |notification| {
|
client.import_notification_stream().for_each(move |notification| {
|
||||||
if notification.is_new_best {
|
if notification.is_new_best {
|
||||||
start_bft(¬ification.header, handle.clone(), &*client, network.clone(), &*bft_service, messages.clone());
|
start_bft(¬ification.header, handle.clone(), &*client, network.clone(), &*bft_service);
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
})
|
})
|
||||||
@@ -316,16 +263,14 @@ impl Service {
|
|||||||
let c = client.clone();
|
let c = client.clone();
|
||||||
let s = bft_service.clone();
|
let s = bft_service.clone();
|
||||||
let n = network.clone();
|
let n = network.clone();
|
||||||
let m = messages.clone();
|
|
||||||
let handle = core.handle();
|
let handle = core.handle();
|
||||||
|
|
||||||
interval.map_err(|e| debug!("Timer error: {:?}", e)).for_each(move |_| {
|
interval.map_err(|e| debug!("Timer error: {:?}", e)).for_each(move |_| {
|
||||||
if let Ok(best_block) = c.best_block_header() {
|
if let Ok(best_block) = c.best_block_header() {
|
||||||
let hash = best_block.blake2_256();
|
let hash = best_block.blake2_256();
|
||||||
m.collect_garbage();
|
|
||||||
if hash == prev_best {
|
if hash == prev_best {
|
||||||
debug!("Starting consensus round after a timeout");
|
debug!("Starting consensus round after a timeout");
|
||||||
start_bft(&best_block, handle.clone(), &*c, n.clone(), &*s, m.clone());
|
start_bft(&best_block, handle.clone(), &*c, n.clone(), &*s);
|
||||||
}
|
}
|
||||||
prev_best = hash;
|
prev_best = hash;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user