mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-30 18:41:03 +00:00
Consensus message buffering and more (#114)
* CLI options and keystore integration * Replace multiqueue with future::mpsc * BFT gossip * Revert to app_dirs * generate_from_seed commented * Refactor event loop * Start consensus by timer * Message buffering * Minor fixes * Work around duty-roster issue. * some more minor fixes * fix compilation * more consistent formatting * make bft input stream never conclude * Minor fixes * add timestamp module to executive * more cleanups and logging * Fixed message propagation
This commit is contained in:
committed by
Gav Wood
parent
6a99c9a43d
commit
f532982623
@@ -6,7 +6,7 @@ authors = ["Parity Technologies <admin@parity.io>"]
|
||||
[dependencies]
|
||||
futures = "0.1.17"
|
||||
parking_lot = "0.4"
|
||||
tokio-timer = "0.1.2"
|
||||
tokio-core = "0.1.12"
|
||||
ed25519 = { path = "../../substrate/ed25519" }
|
||||
error-chain = "0.11"
|
||||
log = "0.4"
|
||||
@@ -21,6 +21,5 @@ substrate-codec = { path = "../../substrate/codec" }
|
||||
substrate-primitives = { path = "../../substrate/primitives" }
|
||||
substrate-runtime-support = { path = "../../substrate/runtime-support" }
|
||||
substrate-network = { path = "../../substrate/network" }
|
||||
tokio-core = "0.1.12"
|
||||
substrate-keyring = { path = "../../substrate/keyring" }
|
||||
substrate-client = { path = "../../substrate/client" }
|
||||
|
||||
@@ -32,7 +32,6 @@
|
||||
extern crate futures;
|
||||
extern crate ed25519;
|
||||
extern crate parking_lot;
|
||||
extern crate tokio_timer;
|
||||
extern crate polkadot_api;
|
||||
extern crate polkadot_collator as collator;
|
||||
extern crate polkadot_statement_table as table;
|
||||
@@ -532,6 +531,8 @@ impl<C: PolkadotApi, R: TableRouter> bft::Proposer for Proposer<C, R> {
|
||||
type Evaluate = Result<bool, Error>;
|
||||
|
||||
fn propose(&self) -> Result<SubstrateBlock, Error> {
|
||||
debug!(target: "bft", "proposing block on top of parent ({}, {:?})", self.parent_number, self.parent_hash);
|
||||
|
||||
// TODO: handle case when current timestamp behind that in state.
|
||||
let mut block_builder = self.client.build_block(
|
||||
&self.parent_id,
|
||||
@@ -577,7 +578,18 @@ impl<C: PolkadotApi, R: TableRouter> bft::Proposer for Proposer<C, R> {
|
||||
|
||||
// TODO: certain kinds of errors here should lead to a misbehavior report.
|
||||
fn evaluate(&self, proposal: &SubstrateBlock) -> Result<bool, Error> {
|
||||
evaluate_proposal(proposal, &*self.client, current_timestamp(), &self.parent_hash, &self.parent_id)
|
||||
debug!(target: "bft", "evaluating block on top of parent ({}, {:?})", self.parent_number, self.parent_hash);
|
||||
match evaluate_proposal(proposal, &*self.client, current_timestamp(), &self.parent_hash, &self.parent_id) {
|
||||
Ok(x) => Ok(x),
|
||||
Err(e) => match *e.kind() {
|
||||
ErrorKind::PolkadotApi(polkadot_api::ErrorKind::Executor(_)) => Ok(false),
|
||||
ErrorKind::ProposalNotForPolkadot => Ok(false),
|
||||
ErrorKind::TimestampInFuture => Ok(false),
|
||||
ErrorKind::WrongParentHash(_, _) => Ok(false),
|
||||
ErrorKind::ProposalTooLarge(_) => Ok(false),
|
||||
_ => Err(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn import_misbehavior(&self, misbehavior: Vec<(AuthorityId, bft::Misbehavior)>) {
|
||||
|
||||
@@ -20,12 +20,14 @@
|
||||
/// candidate agreement over the network.
|
||||
|
||||
use std::thread;
|
||||
use std::time::{Duration, Instant};
|
||||
use std::sync::Arc;
|
||||
use futures::{future, Future, Stream, Sink, Async, Canceled};
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use futures::{future, Future, Stream, Sink, Async, Canceled, Poll};
|
||||
use parking_lot::Mutex;
|
||||
use substrate_network as net;
|
||||
use tokio_core::reactor;
|
||||
use client::BlockchainEvents;
|
||||
use client::{BlockchainEvents, ChainHead};
|
||||
use runtime_support::Hashable;
|
||||
use primitives::{Hash, AuthorityId};
|
||||
use primitives::block::{Id as BlockId, HeaderHash, Header};
|
||||
@@ -35,15 +37,101 @@ use bft::{self, BftService};
|
||||
use transaction_pool::TransactionPool;
|
||||
use ed25519;
|
||||
use super::{TableRouter, SharedTable, ProposerFactory};
|
||||
use error::Error;
|
||||
use error;
|
||||
|
||||
const TIMER_DELAY_MS: u64 = 5000;
|
||||
const TIMER_INTERVAL_MS: u64 = 500;
|
||||
const MESSAGE_LIFETIME_SEC: u64 = 10;
|
||||
|
||||
struct BftSink<E> {
|
||||
network: Arc<net::ConsensusService>,
|
||||
parent_hash: HeaderHash,
|
||||
_e: ::std::marker::PhantomData<E>,
|
||||
}
|
||||
|
||||
fn process_message(msg: net::BftMessage, authorities: &[AuthorityId], parent_hash: HeaderHash) -> Result<bft::Communication, bft::Error> {
|
||||
Ok(match msg {
|
||||
#[derive(Clone)]
|
||||
struct SharedMessageCollection {
|
||||
/// Messages for consensus over a block with known hash. Also holds timestamp of the first message.
|
||||
messages: Arc<Mutex<HashMap<HeaderHash, (Instant, VecDeque<net::LocalizedBftMessage>)>>>,
|
||||
}
|
||||
|
||||
impl SharedMessageCollection {
|
||||
fn new() -> SharedMessageCollection {
|
||||
SharedMessageCollection {
|
||||
messages: Arc::new(Mutex::new(HashMap::new())),
|
||||
}
|
||||
}
|
||||
|
||||
fn select(&self, parent_hash: HeaderHash, stream: net::BftMessageStream, authorities: Vec<AuthorityId>) -> Messages {
|
||||
Messages {
|
||||
messages: self.messages.lock().remove(&parent_hash).map(|(_, m)| m).unwrap_or_else(VecDeque::new),
|
||||
parent_hash,
|
||||
network_stream: stream,
|
||||
authorities: authorities,
|
||||
collection: self.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
fn push(&self, message: net::LocalizedBftMessage) {
|
||||
self.messages.lock()
|
||||
.entry(message.parent_hash)
|
||||
.or_insert_with(|| (Instant::now(), VecDeque::new()))
|
||||
.1.push_back(message);
|
||||
}
|
||||
|
||||
fn collect_garbage(&self) {
|
||||
let expiration = Duration::from_secs(MESSAGE_LIFETIME_SEC);
|
||||
let now = Instant::now();
|
||||
self.messages.lock().retain(|_, &mut (timestamp, _)| timestamp < now + expiration);
|
||||
}
|
||||
}
|
||||
|
||||
struct Messages {
|
||||
parent_hash: HeaderHash,
|
||||
messages: VecDeque<net::LocalizedBftMessage>,
|
||||
network_stream: net::BftMessageStream,
|
||||
authorities: Vec<AuthorityId>,
|
||||
collection: SharedMessageCollection,
|
||||
}
|
||||
|
||||
impl Stream for Messages {
|
||||
type Item = bft::Communication;
|
||||
type Error = bft::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
// push buffered messages first
|
||||
while let Some(message) = self.messages.pop_front() {
|
||||
match process_message(message, &self.authorities) {
|
||||
Ok(message) => return Ok(Async::Ready(Some(message))),
|
||||
Err(e) => debug!("Message validation failed: {:?}", e),
|
||||
}
|
||||
}
|
||||
|
||||
// check the network
|
||||
match self.network_stream.poll() {
|
||||
Err(_) => Err(bft::InputStreamConcluded.into()),
|
||||
Ok(Async::NotReady) => Ok(Async::NotReady),
|
||||
Ok(Async::Ready(None)) => Ok(Async::NotReady), // the input stream for agreements is never meant to logically end.
|
||||
Ok(Async::Ready(Some(message))) => {
|
||||
if message.parent_hash == self.parent_hash {
|
||||
match process_message(message, &self.authorities) {
|
||||
Ok(message) => Ok(Async::Ready(Some(message))),
|
||||
Err(e) => {
|
||||
debug!("Message validation failed: {:?}", e);
|
||||
Ok(Async::NotReady)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
self.collection.push(message);
|
||||
Ok(Async::NotReady)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn process_message(msg: net::LocalizedBftMessage, authorities: &[AuthorityId]) -> Result<bft::Communication, bft::Error> {
|
||||
Ok(match msg.message {
|
||||
net::BftMessage::Consensus(c) => bft::generic::Communication::Consensus(match c {
|
||||
net::SignedConsensusMessage::Propose(proposal) => bft::generic::LocalizedMessage::Propose({
|
||||
let proposal = bft::generic::LocalizedProposal {
|
||||
@@ -60,7 +148,7 @@ fn process_message(msg: net::BftMessage, authorities: &[AuthorityId], parent_has
|
||||
signer: ed25519::Public(proposal.sender),
|
||||
}
|
||||
};
|
||||
bft::check_proposal(authorities, &parent_hash, &proposal)?;
|
||||
bft::check_proposal(authorities, &msg.parent_hash, &proposal)?;
|
||||
proposal
|
||||
}),
|
||||
net::SignedConsensusMessage::Vote(vote) => bft::generic::LocalizedMessage::Vote({
|
||||
@@ -76,14 +164,14 @@ fn process_message(msg: net::BftMessage, authorities: &[AuthorityId], parent_has
|
||||
net::ConsensusVote::AdvanceRound(r) => bft::generic::Vote::AdvanceRound(r as usize),
|
||||
}
|
||||
};
|
||||
bft::check_vote(authorities, &parent_hash, &vote)?;
|
||||
bft::check_vote(authorities, &msg.parent_hash, &vote)?;
|
||||
vote
|
||||
}),
|
||||
}),
|
||||
net::BftMessage::Auxiliary(a) => {
|
||||
let justification = bft::UncheckedJustification::from(a);
|
||||
// TODO: get proper error
|
||||
let justification: Result<_, bft::Error> = bft::check_prepare_justification(authorities, parent_hash, justification)
|
||||
let justification: Result<_, bft::Error> = bft::check_prepare_justification(authorities, msg.parent_hash, justification)
|
||||
.map_err(|_| bft::ErrorKind::InvalidJustification.into());
|
||||
bft::generic::Communication::Auxiliary(justification?)
|
||||
},
|
||||
@@ -96,27 +184,30 @@ impl<E> Sink for BftSink<E> {
|
||||
type SinkError = E;
|
||||
|
||||
fn start_send(&mut self, message: bft::Communication) -> ::futures::StartSend<bft::Communication, E> {
|
||||
let network_message = match message {
|
||||
bft::generic::Communication::Consensus(c) => net::BftMessage::Consensus(match c {
|
||||
bft::generic::LocalizedMessage::Propose(proposal) => net::SignedConsensusMessage::Propose(net::SignedConsensusProposal {
|
||||
round_number: proposal.round_number as u32,
|
||||
proposal: proposal.proposal,
|
||||
digest: proposal.digest,
|
||||
sender: proposal.sender,
|
||||
digest_signature: proposal.digest_signature.signature,
|
||||
full_signature: proposal.full_signature.signature,
|
||||
let network_message = net::LocalizedBftMessage {
|
||||
message: match message {
|
||||
bft::generic::Communication::Consensus(c) => net::BftMessage::Consensus(match c {
|
||||
bft::generic::LocalizedMessage::Propose(proposal) => net::SignedConsensusMessage::Propose(net::SignedConsensusProposal {
|
||||
round_number: proposal.round_number as u32,
|
||||
proposal: proposal.proposal,
|
||||
digest: proposal.digest,
|
||||
sender: proposal.sender,
|
||||
digest_signature: proposal.digest_signature.signature,
|
||||
full_signature: proposal.full_signature.signature,
|
||||
}),
|
||||
bft::generic::LocalizedMessage::Vote(vote) => net::SignedConsensusMessage::Vote(net::SignedConsensusVote {
|
||||
sender: vote.sender,
|
||||
signature: vote.signature.signature,
|
||||
vote: match vote.vote {
|
||||
bft::generic::Vote::Prepare(r, h) => net::ConsensusVote::Prepare(r as u32, h),
|
||||
bft::generic::Vote::Commit(r, h) => net::ConsensusVote::Commit(r as u32, h),
|
||||
bft::generic::Vote::AdvanceRound(r) => net::ConsensusVote::AdvanceRound(r as u32),
|
||||
}
|
||||
}),
|
||||
}),
|
||||
bft::generic::LocalizedMessage::Vote(vote) => net::SignedConsensusMessage::Vote(net::SignedConsensusVote {
|
||||
sender: vote.sender,
|
||||
signature: vote.signature.signature,
|
||||
vote: match vote.vote {
|
||||
bft::generic::Vote::Prepare(r, h) => net::ConsensusVote::Prepare(r as u32, h),
|
||||
bft::generic::Vote::Commit(r, h) => net::ConsensusVote::Commit(r as u32, h),
|
||||
bft::generic::Vote::AdvanceRound(r) => net::ConsensusVote::AdvanceRound(r as u32),
|
||||
}
|
||||
}),
|
||||
}),
|
||||
bft::generic::Communication::Auxiliary(justification) => net::BftMessage::Auxiliary(justification.uncheck().into()),
|
||||
bft::generic::Communication::Auxiliary(justification) => net::BftMessage::Auxiliary(justification.uncheck().into()),
|
||||
},
|
||||
parent_hash: self.parent_hash,
|
||||
};
|
||||
self.network.send_bft_message(network_message);
|
||||
Ok(::futures::AsyncSink::Ready)
|
||||
@@ -134,17 +225,49 @@ pub struct Service {
|
||||
|
||||
struct Network(Arc<net::ConsensusService>);
|
||||
|
||||
fn start_bft<F, C>(
|
||||
header: &Header,
|
||||
handle: reactor::Handle,
|
||||
client: &bft::Authorities,
|
||||
network: Arc<net::ConsensusService>,
|
||||
bft_service: &BftService<F, C>,
|
||||
messages: SharedMessageCollection
|
||||
) where
|
||||
F: bft::ProposerFactory + 'static,
|
||||
C: bft::BlockImport + bft::Authorities + 'static,
|
||||
<F as bft::ProposerFactory>::Error: ::std::fmt::Debug,
|
||||
<F::Proposer as bft::Proposer>::Error: ::std::fmt::Display + Into<error::Error>,
|
||||
{
|
||||
let hash = header.blake2_256().into();
|
||||
if bft_service.live_agreement().map_or(false, |h| h == hash) {
|
||||
return;
|
||||
}
|
||||
let authorities = match client.authorities(&BlockId::Hash(hash)) {
|
||||
Ok(authorities) => authorities,
|
||||
Err(e) => {
|
||||
debug!("Error reading authorities: {:?}", e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
let input = messages.select(hash, network.bft_messages(), authorities).map_err(|e| e.into());
|
||||
let output = BftSink { network: network, parent_hash: hash.clone(), _e: Default::default() };
|
||||
match bft_service.build_upon(&header, input, output) {
|
||||
Ok(Some(bft)) => handle.spawn(bft),
|
||||
Ok(None) => {},
|
||||
Err(e) => debug!("BFT agreement error: {:?}", e),
|
||||
}
|
||||
}
|
||||
|
||||
impl Service {
|
||||
/// Create and start a new instance.
|
||||
pub fn new<C>(
|
||||
client: Arc<C>,
|
||||
network: Arc<net::ConsensusService>,
|
||||
transaction_pool: Arc<Mutex<TransactionPool>>,
|
||||
key: ed25519::Pair,
|
||||
best_header: &Header) -> Service
|
||||
where C: BlockchainEvents + bft::BlockImport + bft::Authorities + PolkadotApi + Send + Sync + 'static
|
||||
key: ed25519::Pair
|
||||
) -> Service
|
||||
where C: BlockchainEvents + ChainHead + bft::BlockImport + bft::Authorities + PolkadotApi + Send + Sync + 'static
|
||||
{
|
||||
let best_header = best_header.clone();
|
||||
let thread = thread::spawn(move || {
|
||||
let mut core = reactor::Core::new().expect("tokio::Core could not be created");
|
||||
let key = Arc::new(key);
|
||||
@@ -153,31 +276,44 @@ impl Service {
|
||||
transaction_pool: transaction_pool.clone(),
|
||||
network: Network(network.clone()),
|
||||
};
|
||||
let bft_service = BftService::new(client.clone(), key, factory);
|
||||
let build_bft = |header: &Header| -> Result<_, Error> {
|
||||
let hash = header.blake2_256().into();
|
||||
let authorities = client.authorities(&BlockId::Hash(hash))?;
|
||||
let input = network.bft_messages()
|
||||
.filter_map(move |message| {
|
||||
process_message(message, &authorities, hash.clone())
|
||||
.map_err(|e| debug!("Message validation failed: {:?}", e))
|
||||
.ok()
|
||||
})
|
||||
.map_err(|_| bft::InputStreamConcluded.into());
|
||||
let output = BftSink { network: network.clone(), _e: Default::default() };
|
||||
Ok(bft_service.build_upon(&header, input, output)?)
|
||||
let messages = SharedMessageCollection::new();
|
||||
let bft_service = Arc::new(BftService::new(client.clone(), key, factory));
|
||||
|
||||
let handle = core.handle();
|
||||
let notifications = client.import_notification_stream().for_each(|notification| {
|
||||
if notification.is_new_best {
|
||||
start_bft(¬ification.header, handle.clone(), &*client, network.clone(), &*bft_service, messages.clone());
|
||||
}
|
||||
Ok(())
|
||||
});
|
||||
|
||||
let interval = reactor::Interval::new_at(Instant::now() + Duration::from_millis(TIMER_DELAY_MS), Duration::from_millis(TIMER_INTERVAL_MS), &handle).unwrap();
|
||||
let mut prev_best = match client.best_block_header() {
|
||||
Ok(header) => header.blake2_256(),
|
||||
Err(e) => {
|
||||
warn!("Cant's start consensus service. Error reading best block header: {:?}", e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
// Kickstart BFT agreement on start.
|
||||
if let Err(e) = build_bft(&best_header)
|
||||
.map_err(|e| debug!("Error creating initial BFT agreement: {:?}", e))
|
||||
.and_then(|bft| core.run(bft))
|
||||
{
|
||||
debug!("Error starting initial BFT agreement: {:?}", e);
|
||||
}
|
||||
let bft = client.import_notification_stream().and_then(|notification| {
|
||||
build_bft(¬ification.header).map_err(|e| debug!("BFT agreement error: {:?}", e))
|
||||
}).for_each(|f| f);
|
||||
if let Err(e) = core.run(bft) {
|
||||
let c = client.clone();
|
||||
let s = bft_service.clone();
|
||||
let n = network.clone();
|
||||
let m = messages.clone();
|
||||
let handle = core.handle();
|
||||
let timed = interval.map_err(|e| debug!("Timer error: {:?}", e)).for_each(move |_| {
|
||||
if let Ok(best_block) = c.best_block_header() {
|
||||
let hash = best_block.blake2_256();
|
||||
m.collect_garbage();
|
||||
if hash == prev_best {
|
||||
debug!("Starting consensus round after a timeout");
|
||||
start_bft(&best_block, handle.clone(), &*c, n.clone(), &*s, m.clone());
|
||||
}
|
||||
prev_best = hash;
|
||||
}
|
||||
Ok(())
|
||||
});
|
||||
core.handle().spawn(timed);
|
||||
if let Err(e) = core.run(notifications) {
|
||||
debug!("BFT event loop error {:?}", e);
|
||||
}
|
||||
});
|
||||
@@ -235,3 +371,4 @@ impl TableRouter for Router {
|
||||
future::ok(Extrinsic)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user