Consensus message buffering and more (#114)

* CLI options and keystore integration

* Replace multiqueue with future::mpsc

* BFT gossip

* Revert to app_dirs

* generate_from_seed commented

* Refactor event loop

* Start consensus by timer

* Message buffering

* Minor fixes

* Work around duty-roster issue.

* some more minor fixes

* fix compilation

* more consistent formatting

* make bft input stream never conclude

* Minor fixes

* add timestamp module to executive

* more cleanups and logging

* Fixed message propagation
This commit is contained in:
Arkadiy Paronyan
2018-04-06 19:18:26 +02:00
committed by Gav Wood
parent 633b9f4c0b
commit b3dd4e74fd
34 changed files with 413 additions and 146 deletions
+14 -1
View File
@@ -192,6 +192,16 @@ name = "crunchy"
version = "0.1.6" version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "ctrlc"
version = "1.1.1"
source = "git+https://github.com/paritytech/rust-ctrlc.git#b523017108bb2d571a7a69bd97bc406e63bc7a9d"
dependencies = [
"kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "demo-cli" name = "demo-cli"
version = "0.1.0" version = "0.1.0"
@@ -1113,6 +1123,7 @@ dependencies = [
name = "polkadot" name = "polkadot"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"ctrlc 1.1.1 (git+https://github.com/paritytech/rust-ctrlc.git)",
"error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", "error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)",
"polkadot-cli 0.1.0", "polkadot-cli 0.1.0",
] ]
@@ -1130,6 +1141,7 @@ dependencies = [
"substrate-executor 0.1.0", "substrate-executor 0.1.0",
"substrate-keyring 0.1.0", "substrate-keyring 0.1.0",
"substrate-primitives 0.1.0", "substrate-primitives 0.1.0",
"substrate-runtime-executive 0.1.0",
"substrate-runtime-io 0.1.0", "substrate-runtime-io 0.1.0",
"substrate-state-machine 0.1.0", "substrate-state-machine 0.1.0",
] ]
@@ -1191,7 +1203,6 @@ dependencies = [
"substrate-primitives 0.1.0", "substrate-primitives 0.1.0",
"substrate-runtime-support 0.1.0", "substrate-runtime-support 0.1.0",
"tokio-core 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-timer 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]] [[package]]
@@ -1679,6 +1690,7 @@ dependencies = [
"ed25519 0.1.0", "ed25519 0.1.0",
"error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", "error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"substrate-codec 0.1.0", "substrate-codec 0.1.0",
"substrate-executor 0.1.0", "substrate-executor 0.1.0",
@@ -2449,6 +2461,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum constant_time_eq 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "8ff012e225ce166d4422e0e78419d901719760f62ae2b7969ca6b564d1b54a9e" "checksum constant_time_eq 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "8ff012e225ce166d4422e0e78419d901719760f62ae2b7969ca6b564d1b54a9e"
"checksum crossbeam 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "24ce9782d4d5c53674646a6a4c1863a21a8fc0cb649b3c94dfc16e45071dea19" "checksum crossbeam 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "24ce9782d4d5c53674646a6a4c1863a21a8fc0cb649b3c94dfc16e45071dea19"
"checksum crunchy 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "a2f4a431c5c9f662e1200b7c7f02c34e91361150e382089a8f2dec3ba680cbda" "checksum crunchy 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "a2f4a431c5c9f662e1200b7c7f02c34e91361150e382089a8f2dec3ba680cbda"
"checksum ctrlc 1.1.1 (git+https://github.com/paritytech/rust-ctrlc.git)" = "<none>"
"checksum difference 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b3304d19798a8e067e48d8e69b2c37f0b5e9b4e462504ad9e27e9f3fce02bba8" "checksum difference 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b3304d19798a8e067e48d8e69b2c37f0b5e9b4e462504ad9e27e9f3fce02bba8"
"checksum dtoa 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "09c3753c3db574d215cba4ea76018483895d7bff25a31b49ba45db21c48e50ab" "checksum dtoa 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "09c3753c3db574d215cba4ea76018483895d7bff25a31b49ba45db21c48e50ab"
"checksum edit-distance 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3bd26878c3d921f89797a4e1a1711919f999a9f6946bb6f5a4ffda126d297b7e" "checksum edit-distance 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3bd26878c3d921f89797a4e1a1711919f999a9f6946bb6f5a4ffda126d297b7e"
+1
View File
@@ -10,6 +10,7 @@ authors = ["Parity Technologies <admin@parity.io>"]
[dependencies] [dependencies]
error-chain = "0.11" error-chain = "0.11"
polkadot-cli = { path = "polkadot/cli" } polkadot-cli = { path = "polkadot/cli" }
ctrlc = { git = "https://github.com/paritytech/rust-ctrlc.git" }
[workspace] [workspace]
members = [ members = [
+1
View File
@@ -10,6 +10,7 @@ polkadot-runtime = { path = "../runtime" }
polkadot-primitives = { path = "../primitives" } polkadot-primitives = { path = "../primitives" }
substrate-codec = { path = "../../substrate/codec" } substrate-codec = { path = "../../substrate/codec" }
substrate-runtime-io = { path = "../../substrate/runtime-io" } substrate-runtime-io = { path = "../../substrate/runtime-io" }
substrate-runtime-executive = { path = "../../substrate/runtime/executive" }
substrate-client = { path = "../../substrate/client" } substrate-client = { path = "../../substrate/client" }
substrate-primitives = { path = "../../substrate/primitives" } substrate-primitives = { path = "../../substrate/primitives" }
substrate-executor = { path = "../../substrate/executor" } substrate-executor = { path = "../../substrate/executor" }
+6 -1
View File
@@ -24,6 +24,7 @@ extern crate substrate_codec as codec;
extern crate substrate_runtime_io as runtime_io; extern crate substrate_runtime_io as runtime_io;
extern crate substrate_client as client; extern crate substrate_client as client;
extern crate substrate_executor as substrate_executor; extern crate substrate_executor as substrate_executor;
extern crate substrate_runtime_executive;
extern crate substrate_primitives; extern crate substrate_primitives;
extern crate substrate_state_machine as state_machine; extern crate substrate_state_machine as state_machine;
@@ -323,16 +324,19 @@ impl<S: state_machine::Backend> BlockBuilder for ClientBlockBuilder<S>
} }
fn bake(mut self) -> Block { fn bake(mut self) -> Block {
use substrate_runtime_executive::extrinsics_root;
let mut ext = state_machine::Ext { let mut ext = state_machine::Ext {
overlay: &mut self.changes, overlay: &mut self.changes,
backend: &self.state, backend: &self.state,
}; };
let final_header = ::substrate_executor::with_native_environment( let mut final_header = ::substrate_executor::with_native_environment(
&mut ext, &mut ext,
move || runtime::Executive::finalise_block() move || runtime::Executive::finalise_block()
).expect("all inherent extrinsics pushed; all other extrinsics executed correctly; qed"); ).expect("all inherent extrinsics pushed; all other extrinsics executed correctly; qed");
final_header.extrinsics_root = extrinsics_root::<runtime_io::BlakeTwo256, _>(&self.extrinsics);
Block { Block {
header: final_header, header: final_header,
extrinsics: self.extrinsics, extrinsics: self.extrinsics,
@@ -404,6 +408,7 @@ mod tests {
let block = block_builder.bake(); let block = block_builder.bake();
assert_eq!(block.header.number, 1); assert_eq!(block.header.number, 1);
assert!(block.header.extrinsics_root != Default::default());
} }
#[test] #[test]
+1 -1
View File
@@ -12,7 +12,7 @@ log = "0.3"
hex-literal = "0.1" hex-literal = "0.1"
triehash = "0.1" triehash = "0.1"
ed25519 = { path = "../../substrate/ed25519" } ed25519 = { path = "../../substrate/ed25519" }
app_dirs = "1.2.1" app_dirs = "1.2"
substrate-client = { path = "../../substrate/client" } substrate-client = { path = "../../substrate/client" }
substrate-codec = { path = "../../substrate/codec" } substrate-codec = { path = "../../substrate/codec" }
substrate-runtime-io = { path = "../../substrate/runtime-io" } substrate-runtime-io = { path = "../../substrate/runtime-io" }
+4 -3
View File
@@ -43,6 +43,7 @@ pub mod error;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::mpsc;
/// Parse command line arguments and start the node. /// Parse command line arguments and start the node.
/// ///
@@ -52,7 +53,7 @@ use std::net::SocketAddr;
/// 9556-9591 Unassigned /// 9556-9591 Unassigned
/// 9803-9874 Unassigned /// 9803-9874 Unassigned
/// 9926-9949 Unassigned /// 9926-9949 Unassigned
pub fn run<I, T>(args: I) -> error::Result<()> where pub fn run<I, T>(args: I, exit: mpsc::Receiver<()>) -> error::Result<()> where
I: IntoIterator<Item = T>, I: IntoIterator<Item = T>,
T: Into<std::ffi::OsString> + Clone, T: Into<std::ffi::OsString> + Clone,
{ {
@@ -116,9 +117,9 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
address.set_port(rpc_port); address.set_port(rpc_port);
} }
let handler = rpc::rpc_handler(service.client()); let handler = rpc::rpc_handler(service.client());
let server = rpc::start_http(&address, handler)?; let _server = rpc::start_http(&address, handler)?;
server.wait(); exit.recv().ok();
Ok(()) Ok(())
} }
+1 -2
View File
@@ -6,7 +6,7 @@ authors = ["Parity Technologies <admin@parity.io>"]
[dependencies] [dependencies]
futures = "0.1.17" futures = "0.1.17"
parking_lot = "0.4" parking_lot = "0.4"
tokio-timer = "0.1.2" tokio-core = "0.1.12"
ed25519 = { path = "../../substrate/ed25519" } ed25519 = { path = "../../substrate/ed25519" }
error-chain = "0.11" error-chain = "0.11"
log = "0.4" log = "0.4"
@@ -21,6 +21,5 @@ substrate-codec = { path = "../../substrate/codec" }
substrate-primitives = { path = "../../substrate/primitives" } substrate-primitives = { path = "../../substrate/primitives" }
substrate-runtime-support = { path = "../../substrate/runtime-support" } substrate-runtime-support = { path = "../../substrate/runtime-support" }
substrate-network = { path = "../../substrate/network" } substrate-network = { path = "../../substrate/network" }
tokio-core = "0.1.12"
substrate-keyring = { path = "../../substrate/keyring" } substrate-keyring = { path = "../../substrate/keyring" }
substrate-client = { path = "../../substrate/client" } substrate-client = { path = "../../substrate/client" }
+14 -2
View File
@@ -32,7 +32,6 @@
extern crate futures; extern crate futures;
extern crate ed25519; extern crate ed25519;
extern crate parking_lot; extern crate parking_lot;
extern crate tokio_timer;
extern crate polkadot_api; extern crate polkadot_api;
extern crate polkadot_collator as collator; extern crate polkadot_collator as collator;
extern crate polkadot_statement_table as table; extern crate polkadot_statement_table as table;
@@ -532,6 +531,8 @@ impl<C: PolkadotApi, R: TableRouter> bft::Proposer for Proposer<C, R> {
type Evaluate = Result<bool, Error>; type Evaluate = Result<bool, Error>;
fn propose(&self) -> Result<SubstrateBlock, Error> { fn propose(&self) -> Result<SubstrateBlock, Error> {
debug!(target: "bft", "proposing block on top of parent ({}, {:?})", self.parent_number, self.parent_hash);
// TODO: handle case when current timestamp behind that in state. // TODO: handle case when current timestamp behind that in state.
let mut block_builder = self.client.build_block( let mut block_builder = self.client.build_block(
&self.parent_id, &self.parent_id,
@@ -577,7 +578,18 @@ impl<C: PolkadotApi, R: TableRouter> bft::Proposer for Proposer<C, R> {
// TODO: certain kinds of errors here should lead to a misbehavior report. // TODO: certain kinds of errors here should lead to a misbehavior report.
fn evaluate(&self, proposal: &SubstrateBlock) -> Result<bool, Error> { fn evaluate(&self, proposal: &SubstrateBlock) -> Result<bool, Error> {
evaluate_proposal(proposal, &*self.client, current_timestamp(), &self.parent_hash, &self.parent_id) debug!(target: "bft", "evaluating block on top of parent ({}, {:?})", self.parent_number, self.parent_hash);
match evaluate_proposal(proposal, &*self.client, current_timestamp(), &self.parent_hash, &self.parent_id) {
Ok(x) => Ok(x),
Err(e) => match *e.kind() {
ErrorKind::PolkadotApi(polkadot_api::ErrorKind::Executor(_)) => Ok(false),
ErrorKind::ProposalNotForPolkadot => Ok(false),
ErrorKind::TimestampInFuture => Ok(false),
ErrorKind::WrongParentHash(_, _) => Ok(false),
ErrorKind::ProposalTooLarge(_) => Ok(false),
_ => Err(e),
}
}
} }
fn import_misbehavior(&self, misbehavior: Vec<(AuthorityId, bft::Misbehavior)>) { fn import_misbehavior(&self, misbehavior: Vec<(AuthorityId, bft::Misbehavior)>) {
+193 -56
View File
@@ -20,12 +20,14 @@
/// candidate agreement over the network. /// candidate agreement over the network.
use std::thread; use std::thread;
use std::time::{Duration, Instant};
use std::sync::Arc; use std::sync::Arc;
use futures::{future, Future, Stream, Sink, Async, Canceled}; use std::collections::{HashMap, VecDeque};
use futures::{future, Future, Stream, Sink, Async, Canceled, Poll};
use parking_lot::Mutex; use parking_lot::Mutex;
use substrate_network as net; use substrate_network as net;
use tokio_core::reactor; use tokio_core::reactor;
use client::BlockchainEvents; use client::{BlockchainEvents, ChainHead};
use runtime_support::Hashable; use runtime_support::Hashable;
use primitives::{Hash, AuthorityId}; use primitives::{Hash, AuthorityId};
use primitives::block::{Id as BlockId, HeaderHash, Header}; use primitives::block::{Id as BlockId, HeaderHash, Header};
@@ -35,15 +37,101 @@ use bft::{self, BftService};
use transaction_pool::TransactionPool; use transaction_pool::TransactionPool;
use ed25519; use ed25519;
use super::{TableRouter, SharedTable, ProposerFactory}; use super::{TableRouter, SharedTable, ProposerFactory};
use error::Error; use error;
const TIMER_DELAY_MS: u64 = 5000;
const TIMER_INTERVAL_MS: u64 = 500;
const MESSAGE_LIFETIME_SEC: u64 = 10;
struct BftSink<E> { struct BftSink<E> {
network: Arc<net::ConsensusService>, network: Arc<net::ConsensusService>,
parent_hash: HeaderHash,
_e: ::std::marker::PhantomData<E>, _e: ::std::marker::PhantomData<E>,
} }
fn process_message(msg: net::BftMessage, authorities: &[AuthorityId], parent_hash: HeaderHash) -> Result<bft::Communication, bft::Error> { #[derive(Clone)]
Ok(match msg { struct SharedMessageCollection {
/// Messages for consensus over a block with known hash. Also holds timestamp of the first message.
messages: Arc<Mutex<HashMap<HeaderHash, (Instant, VecDeque<net::LocalizedBftMessage>)>>>,
}
impl SharedMessageCollection {
fn new() -> SharedMessageCollection {
SharedMessageCollection {
messages: Arc::new(Mutex::new(HashMap::new())),
}
}
fn select(&self, parent_hash: HeaderHash, stream: net::BftMessageStream, authorities: Vec<AuthorityId>) -> Messages {
Messages {
messages: self.messages.lock().remove(&parent_hash).map(|(_, m)| m).unwrap_or_else(VecDeque::new),
parent_hash,
network_stream: stream,
authorities: authorities,
collection: self.clone(),
}
}
fn push(&self, message: net::LocalizedBftMessage) {
self.messages.lock()
.entry(message.parent_hash)
.or_insert_with(|| (Instant::now(), VecDeque::new()))
.1.push_back(message);
}
fn collect_garbage(&self) {
let expiration = Duration::from_secs(MESSAGE_LIFETIME_SEC);
let now = Instant::now();
self.messages.lock().retain(|_, &mut (timestamp, _)| timestamp < now + expiration);
}
}
struct Messages {
parent_hash: HeaderHash,
messages: VecDeque<net::LocalizedBftMessage>,
network_stream: net::BftMessageStream,
authorities: Vec<AuthorityId>,
collection: SharedMessageCollection,
}
impl Stream for Messages {
type Item = bft::Communication;
type Error = bft::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
// push buffered messages first
while let Some(message) = self.messages.pop_front() {
match process_message(message, &self.authorities) {
Ok(message) => return Ok(Async::Ready(Some(message))),
Err(e) => debug!("Message validation failed: {:?}", e),
}
}
// check the network
match self.network_stream.poll() {
Err(_) => Err(bft::InputStreamConcluded.into()),
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(None)) => Ok(Async::NotReady), // the input stream for agreements is never meant to logically end.
Ok(Async::Ready(Some(message))) => {
if message.parent_hash == self.parent_hash {
match process_message(message, &self.authorities) {
Ok(message) => Ok(Async::Ready(Some(message))),
Err(e) => {
debug!("Message validation failed: {:?}", e);
Ok(Async::NotReady)
}
}
} else {
self.collection.push(message);
Ok(Async::NotReady)
}
}
}
}
}
fn process_message(msg: net::LocalizedBftMessage, authorities: &[AuthorityId]) -> Result<bft::Communication, bft::Error> {
Ok(match msg.message {
net::BftMessage::Consensus(c) => bft::generic::Communication::Consensus(match c { net::BftMessage::Consensus(c) => bft::generic::Communication::Consensus(match c {
net::SignedConsensusMessage::Propose(proposal) => bft::generic::LocalizedMessage::Propose({ net::SignedConsensusMessage::Propose(proposal) => bft::generic::LocalizedMessage::Propose({
let proposal = bft::generic::LocalizedProposal { let proposal = bft::generic::LocalizedProposal {
@@ -60,7 +148,7 @@ fn process_message(msg: net::BftMessage, authorities: &[AuthorityId], parent_has
signer: ed25519::Public(proposal.sender), signer: ed25519::Public(proposal.sender),
} }
}; };
bft::check_proposal(authorities, &parent_hash, &proposal)?; bft::check_proposal(authorities, &msg.parent_hash, &proposal)?;
proposal proposal
}), }),
net::SignedConsensusMessage::Vote(vote) => bft::generic::LocalizedMessage::Vote({ net::SignedConsensusMessage::Vote(vote) => bft::generic::LocalizedMessage::Vote({
@@ -76,14 +164,14 @@ fn process_message(msg: net::BftMessage, authorities: &[AuthorityId], parent_has
net::ConsensusVote::AdvanceRound(r) => bft::generic::Vote::AdvanceRound(r as usize), net::ConsensusVote::AdvanceRound(r) => bft::generic::Vote::AdvanceRound(r as usize),
} }
}; };
bft::check_vote(authorities, &parent_hash, &vote)?; bft::check_vote(authorities, &msg.parent_hash, &vote)?;
vote vote
}), }),
}), }),
net::BftMessage::Auxiliary(a) => { net::BftMessage::Auxiliary(a) => {
let justification = bft::UncheckedJustification::from(a); let justification = bft::UncheckedJustification::from(a);
// TODO: get proper error // TODO: get proper error
let justification: Result<_, bft::Error> = bft::check_prepare_justification(authorities, parent_hash, justification) let justification: Result<_, bft::Error> = bft::check_prepare_justification(authorities, msg.parent_hash, justification)
.map_err(|_| bft::ErrorKind::InvalidJustification.into()); .map_err(|_| bft::ErrorKind::InvalidJustification.into());
bft::generic::Communication::Auxiliary(justification?) bft::generic::Communication::Auxiliary(justification?)
}, },
@@ -96,27 +184,30 @@ impl<E> Sink for BftSink<E> {
type SinkError = E; type SinkError = E;
fn start_send(&mut self, message: bft::Communication) -> ::futures::StartSend<bft::Communication, E> { fn start_send(&mut self, message: bft::Communication) -> ::futures::StartSend<bft::Communication, E> {
let network_message = match message { let network_message = net::LocalizedBftMessage {
bft::generic::Communication::Consensus(c) => net::BftMessage::Consensus(match c { message: match message {
bft::generic::LocalizedMessage::Propose(proposal) => net::SignedConsensusMessage::Propose(net::SignedConsensusProposal { bft::generic::Communication::Consensus(c) => net::BftMessage::Consensus(match c {
round_number: proposal.round_number as u32, bft::generic::LocalizedMessage::Propose(proposal) => net::SignedConsensusMessage::Propose(net::SignedConsensusProposal {
proposal: proposal.proposal, round_number: proposal.round_number as u32,
digest: proposal.digest, proposal: proposal.proposal,
sender: proposal.sender, digest: proposal.digest,
digest_signature: proposal.digest_signature.signature, sender: proposal.sender,
full_signature: proposal.full_signature.signature, digest_signature: proposal.digest_signature.signature,
full_signature: proposal.full_signature.signature,
}),
bft::generic::LocalizedMessage::Vote(vote) => net::SignedConsensusMessage::Vote(net::SignedConsensusVote {
sender: vote.sender,
signature: vote.signature.signature,
vote: match vote.vote {
bft::generic::Vote::Prepare(r, h) => net::ConsensusVote::Prepare(r as u32, h),
bft::generic::Vote::Commit(r, h) => net::ConsensusVote::Commit(r as u32, h),
bft::generic::Vote::AdvanceRound(r) => net::ConsensusVote::AdvanceRound(r as u32),
}
}),
}), }),
bft::generic::LocalizedMessage::Vote(vote) => net::SignedConsensusMessage::Vote(net::SignedConsensusVote { bft::generic::Communication::Auxiliary(justification) => net::BftMessage::Auxiliary(justification.uncheck().into()),
sender: vote.sender, },
signature: vote.signature.signature, parent_hash: self.parent_hash,
vote: match vote.vote {
bft::generic::Vote::Prepare(r, h) => net::ConsensusVote::Prepare(r as u32, h),
bft::generic::Vote::Commit(r, h) => net::ConsensusVote::Commit(r as u32, h),
bft::generic::Vote::AdvanceRound(r) => net::ConsensusVote::AdvanceRound(r as u32),
}
}),
}),
bft::generic::Communication::Auxiliary(justification) => net::BftMessage::Auxiliary(justification.uncheck().into()),
}; };
self.network.send_bft_message(network_message); self.network.send_bft_message(network_message);
Ok(::futures::AsyncSink::Ready) Ok(::futures::AsyncSink::Ready)
@@ -134,17 +225,49 @@ pub struct Service {
struct Network(Arc<net::ConsensusService>); struct Network(Arc<net::ConsensusService>);
fn start_bft<F, C>(
header: &Header,
handle: reactor::Handle,
client: &bft::Authorities,
network: Arc<net::ConsensusService>,
bft_service: &BftService<F, C>,
messages: SharedMessageCollection
) where
F: bft::ProposerFactory + 'static,
C: bft::BlockImport + bft::Authorities + 'static,
<F as bft::ProposerFactory>::Error: ::std::fmt::Debug,
<F::Proposer as bft::Proposer>::Error: ::std::fmt::Display + Into<error::Error>,
{
let hash = header.blake2_256().into();
if bft_service.live_agreement().map_or(false, |h| h == hash) {
return;
}
let authorities = match client.authorities(&BlockId::Hash(hash)) {
Ok(authorities) => authorities,
Err(e) => {
debug!("Error reading authorities: {:?}", e);
return;
}
};
let input = messages.select(hash, network.bft_messages(), authorities).map_err(|e| e.into());
let output = BftSink { network: network, parent_hash: hash.clone(), _e: Default::default() };
match bft_service.build_upon(&header, input, output) {
Ok(Some(bft)) => handle.spawn(bft),
Ok(None) => {},
Err(e) => debug!("BFT agreement error: {:?}", e),
}
}
impl Service { impl Service {
/// Create and start a new instance. /// Create and start a new instance.
pub fn new<C>( pub fn new<C>(
client: Arc<C>, client: Arc<C>,
network: Arc<net::ConsensusService>, network: Arc<net::ConsensusService>,
transaction_pool: Arc<Mutex<TransactionPool>>, transaction_pool: Arc<Mutex<TransactionPool>>,
key: ed25519::Pair, key: ed25519::Pair
best_header: &Header) -> Service ) -> Service
where C: BlockchainEvents + bft::BlockImport + bft::Authorities + PolkadotApi + Send + Sync + 'static where C: BlockchainEvents + ChainHead + bft::BlockImport + bft::Authorities + PolkadotApi + Send + Sync + 'static
{ {
let best_header = best_header.clone();
let thread = thread::spawn(move || { let thread = thread::spawn(move || {
let mut core = reactor::Core::new().expect("tokio::Core could not be created"); let mut core = reactor::Core::new().expect("tokio::Core could not be created");
let key = Arc::new(key); let key = Arc::new(key);
@@ -153,31 +276,44 @@ impl Service {
transaction_pool: transaction_pool.clone(), transaction_pool: transaction_pool.clone(),
network: Network(network.clone()), network: Network(network.clone()),
}; };
let bft_service = BftService::new(client.clone(), key, factory); let messages = SharedMessageCollection::new();
let build_bft = |header: &Header| -> Result<_, Error> { let bft_service = Arc::new(BftService::new(client.clone(), key, factory));
let hash = header.blake2_256().into();
let authorities = client.authorities(&BlockId::Hash(hash))?; let handle = core.handle();
let input = network.bft_messages() let notifications = client.import_notification_stream().for_each(|notification| {
.filter_map(move |message| { if notification.is_new_best {
process_message(message, &authorities, hash.clone()) start_bft(&notification.header, handle.clone(), &*client, network.clone(), &*bft_service, messages.clone());
.map_err(|e| debug!("Message validation failed: {:?}", e)) }
.ok() Ok(())
}) });
.map_err(|_| bft::InputStreamConcluded.into());
let output = BftSink { network: network.clone(), _e: Default::default() }; let interval = reactor::Interval::new_at(Instant::now() + Duration::from_millis(TIMER_DELAY_MS), Duration::from_millis(TIMER_INTERVAL_MS), &handle).unwrap();
Ok(bft_service.build_upon(&header, input, output)?) let mut prev_best = match client.best_block_header() {
Ok(header) => header.blake2_256(),
Err(e) => {
warn!("Cant's start consensus service. Error reading best block header: {:?}", e);
return;
}
}; };
// Kickstart BFT agreement on start. let c = client.clone();
if let Err(e) = build_bft(&best_header) let s = bft_service.clone();
.map_err(|e| debug!("Error creating initial BFT agreement: {:?}", e)) let n = network.clone();
.and_then(|bft| core.run(bft)) let m = messages.clone();
{ let handle = core.handle();
debug!("Error starting initial BFT agreement: {:?}", e); let timed = interval.map_err(|e| debug!("Timer error: {:?}", e)).for_each(move |_| {
} if let Ok(best_block) = c.best_block_header() {
let bft = client.import_notification_stream().and_then(|notification| { let hash = best_block.blake2_256();
build_bft(&notification.header).map_err(|e| debug!("BFT agreement error: {:?}", e)) m.collect_garbage();
}).for_each(|f| f); if hash == prev_best {
if let Err(e) = core.run(bft) { debug!("Starting consensus round after a timeout");
start_bft(&best_block, handle.clone(), &*c, n.clone(), &*s, m.clone());
}
prev_best = hash;
}
Ok(())
});
core.handle().spawn(timed);
if let Err(e) = core.run(notifications) {
debug!("BFT event loop error {:?}", e); debug!("BFT event loop error {:?}", e);
} }
}); });
@@ -235,3 +371,4 @@ impl TableRouter for Router {
future::ok(Extrinsic) future::ok(Extrinsic)
} }
} }
+1 -1
View File
@@ -146,7 +146,7 @@ pub type UncheckedExtrinsic = generic::UncheckedExtrinsic<AccountId, Index, Call
pub type Extrinsic = generic::Extrinsic<AccountId, Index, Call>; pub type Extrinsic = generic::Extrinsic<AccountId, Index, Call>;
/// Executive: handles dispatch to the various modules. /// Executive: handles dispatch to the various modules.
pub type Executive = executive::Executive<Concrete, Block, Staking, pub type Executive = executive::Executive<Concrete, Block, Staking,
((((((), Parachains), Council), Democracy), Staking), Session)>; (((((((), Parachains), Council), Democracy), Staking), Session), Timestamp)>;
impl_outer_config! { impl_outer_config! {
pub struct GenesisConfig for Concrete { pub struct GenesisConfig for Concrete {
+1 -1
View File
@@ -44,7 +44,7 @@ impl<T: Trait> Module<T> {
pub fn calculate_duty_roster() -> DutyRoster { pub fn calculate_duty_roster() -> DutyRoster {
let parachain_count = Self::count(); let parachain_count = Self::count();
let validator_count = <session::Module<T>>::validator_count(); let validator_count = <session::Module<T>>::validator_count();
let validators_per_parachain = (validator_count - 1) / parachain_count; let validators_per_parachain = if parachain_count != 0 { (validator_count - 1) / parachain_count } else { 0 };
let mut roles_val = (0..validator_count).map(|i| match i { let mut roles_val = (0..validator_count).map(|i| match i {
i if i < parachain_count * validators_per_parachain => i if i < parachain_count * validators_per_parachain =>
Binary file not shown.
-1
View File
@@ -7,7 +7,6 @@ authors = ["Parity Technologies <admin@parity.io>"]
futures = "0.1.17" futures = "0.1.17"
parking_lot = "0.4" parking_lot = "0.4"
tokio-timer = "0.1.2" tokio-timer = "0.1.2"
hex-literal = "0.1"
error-chain = "0.11" error-chain = "0.11"
log = "0.4" log = "0.4"
tokio-core = "0.1.12" tokio-core = "0.1.12"
+12 -9
View File
@@ -37,8 +37,6 @@ extern crate substrate_executor;
extern crate tokio_core; extern crate tokio_core;
extern crate substrate_client as client; extern crate substrate_client as client;
#[macro_use]
extern crate hex_literal;
#[macro_use] #[macro_use]
extern crate error_chain; extern crate error_chain;
#[macro_use] #[macro_use]
@@ -141,8 +139,12 @@ impl Service {
} }
let god_keys = vec![ let god_keys = vec![
hex!["f09c0d1467d6952c92c343672bfb06a24560f400af8cf98b93df7d40b4efe1b6"], ed25519::Pair::from_seed(b"Alice ").public().into(),
hex!["84718cd2894bcda83beeca3a7842caf269fe93cacde0bdee0e3cbce6de253f0e"] ed25519::Pair::from_seed(b"Bob ").public().into(),
// ed25519::Pair::from_seed(b"Charlie ").public().into(),
// ed25519::Pair::from_seed(b"Dave ").public().into(),
// ed25519::Pair::from_seed(b"Eve ").public().into(),
// ed25519::Pair::from_seed(b"Ferdie ").public().into(),
]; ];
let genesis_config = GenesisConfig { let genesis_config = GenesisConfig {
@@ -190,15 +192,16 @@ impl Service {
let prepare_genesis = || { let prepare_genesis = || {
storage = genesis_config.build_externalities(); storage = genesis_config.build_externalities();
let block = genesis::construct_genesis_block(&storage); let block = genesis::construct_genesis_block(&storage);
with_externalities(&mut storage, || with_externalities(&mut storage, || {
// TODO: use api.rs to dispatch instead // TODO: use api.rs to dispatch instead
polkadot_runtime::System::initialise_genesis_state(&block.header) polkadot_runtime::System::initialise_genesis_state(&block.header);
); info!("Genesis header hash: {}", polkadot_runtime::System::block_hash(0));
});
(primitives::block::Header::decode(&mut block.header.encode().as_ref()).expect("to_vec() always gives a valid serialisation; qed"), storage.into_iter().collect()) (primitives::block::Header::decode(&mut block.header.encode().as_ref()).expect("to_vec() always gives a valid serialisation; qed"), storage.into_iter().collect())
}; };
let client = Arc::new(client::new_in_mem(executor, prepare_genesis)?); let client = Arc::new(client::new_in_mem(executor, prepare_genesis)?);
let best_header = client.header(&BlockId::Hash(client.info()?.chain.best_hash))?.expect("Best header always exists; qed"); let best_header = client.best_block_header()?;
info!("Starting Polkadot. Best block is #{}", best_header.number); info!("Starting Polkadot. Best block is #{}", best_header.number);
let transaction_pool = Arc::new(Mutex::new(TransactionPool::new(config.transaction_pool))); let transaction_pool = Arc::new(Mutex::new(TransactionPool::new(config.transaction_pool)));
let transaction_pool_adapter = Arc::new(TransactionPoolAdapter { let transaction_pool_adapter = Arc::new(TransactionPoolAdapter {
@@ -220,7 +223,7 @@ impl Service {
// Load the first available key. Code above makes sure it exisis. // Load the first available key. Code above makes sure it exisis.
let key = keystore.load(&keystore.contents()?[0], "")?; let key = keystore.load(&keystore.contents()?[0], "")?;
info!("Using authority key {:?}", key.public()); info!("Using authority key {:?}", key.public());
Some(consensus::Service::new(client.clone(), network.clone(), transaction_pool.clone(), key, &best_header)) Some(consensus::Service::new(client.clone(), network.clone(), transaction_pool.clone(), key))
} else { } else {
None None
}; };
+8 -1
View File
@@ -22,9 +22,16 @@ extern crate polkadot_cli as cli;
#[macro_use] #[macro_use]
extern crate error_chain; extern crate error_chain;
extern crate ctrlc;
use std::sync::mpsc;
quick_main!(run); quick_main!(run);
fn run() -> cli::error::Result<()> { fn run() -> cli::error::Result<()> {
cli::run(::std::env::args()) let (exit_send, exit_receive) = mpsc::channel();
ctrlc::CtrlC::set_handler(move || {
exit_send.send(()).expect("Error sending exit notification");
});
cli::run(::std::env::args(), exit_receive)
} }
@@ -63,10 +63,10 @@ impl PolkadotBlock {
return Err(unchecked); return Err(unchecked);
} }
match unchecked.extrinsics[0].extrinsic.function { match unchecked.extrinsics[0].extrinsic.function {
Call::Timestamp(TimestampCall::set(_)) => return Err(unchecked), Call::Timestamp(TimestampCall::set(_)) => {},
_ => {} _ => return Err(unchecked),
} }
// any further checks... // any further checks...
Ok(PolkadotBlock { block: unchecked, location: None }) Ok(PolkadotBlock { block: unchecked, location: None })
} }
+1
View File
@@ -12,6 +12,7 @@ ed25519 = { path = "../ed25519" }
tokio-timer = "0.1.2" tokio-timer = "0.1.2"
parking_lot = "0.4" parking_lot = "0.4"
error-chain = "0.11" error-chain = "0.11"
log = "0.4"
[dev-dependencies] [dev-dependencies]
substrate-keyring = { path = "../keyring" } substrate-keyring = { path = "../keyring" }
+46 -24
View File
@@ -26,13 +26,16 @@ extern crate ed25519;
extern crate tokio_timer; extern crate tokio_timer;
extern crate parking_lot; extern crate parking_lot;
#[macro_use]
extern crate log;
#[macro_use] #[macro_use]
extern crate futures; extern crate futures;
#[macro_use] #[macro_use]
extern crate error_chain; extern crate error_chain;
use std::collections::HashMap; use std::mem;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
@@ -235,6 +238,7 @@ pub struct BftFuture<P, I, InStream, OutSink> where
impl<P, I, InStream, OutSink> Future for BftFuture<P, I, InStream, OutSink> where impl<P, I, InStream, OutSink> Future for BftFuture<P, I, InStream, OutSink> where
P: Proposer, P: Proposer,
P::Error: ::std::fmt::Display,
I: BlockImport, I: BlockImport,
InStream: Stream<Item=Communication, Error=P::Error>, InStream: Stream<Item=Communication, Error=P::Error>,
OutSink: Sink<SinkItem=Communication, SinkError=P::Error>, OutSink: Sink<SinkItem=Communication, SinkError=P::Error>,
@@ -254,11 +258,16 @@ impl<P, I, InStream, OutSink> Future for BftFuture<P, I, InStream, OutSink> wher
} }
// TODO: handle this error, at least by logging. // TODO: handle this error, at least by logging.
let committed = try_ready!(self.inner.poll().map_err(|_| ())); let committed = try_ready!(self.inner.poll().map_err(|e| {
warn!(target: "bft", "Error in BFT agreement: {}", e);
}));
// If we didn't see the proposal (very unlikely), // If we didn't see the proposal (very unlikely),
// we will get the block from the network later. // we will get the block from the network later.
if let Some(justified_block) = committed.candidate { if let Some(justified_block) = committed.candidate {
info!(target: "bft", "Importing block #{} ({}) directly from BFT consensus",
justified_block.header.number, HeaderHash::from(justified_block.header.blake2_256()));
self.import.import_block(justified_block, committed.justification) self.import.import_block(justified_block, committed.justification)
} }
@@ -302,7 +311,7 @@ impl Drop for AgreementHandle {
/// is notified of. /// is notified of.
pub struct BftService<P, I> { pub struct BftService<P, I> {
client: Arc<I>, client: Arc<I>,
live_agreements: Mutex<HashMap<HeaderHash, AgreementHandle>>, live_agreement: Mutex<Option<(HeaderHash, AgreementHandle)>>,
timer: Timer, timer: Timer,
round_timeout_multiplier: u64, round_timeout_multiplier: u64,
key: Arc<ed25519::Pair>, // TODO: key changing over time. key: Arc<ed25519::Pair>, // TODO: key changing over time.
@@ -312,6 +321,7 @@ pub struct BftService<P, I> {
impl<P, I> BftService<P, I> impl<P, I> BftService<P, I>
where where
P: ProposerFactory, P: ProposerFactory,
<P::Proposer as Proposer>::Error: ::std::fmt::Display,
I: BlockImport + Authorities, I: BlockImport + Authorities,
{ {
@@ -319,7 +329,7 @@ impl<P, I> BftService<P, I>
pub fn new(client: Arc<I>, key: Arc<ed25519::Pair>, factory: P) -> BftService<P, I> { pub fn new(client: Arc<I>, key: Arc<ed25519::Pair>, factory: P) -> BftService<P, I> {
BftService { BftService {
client: client, client: client,
live_agreements: Mutex::new(HashMap::new()), live_agreement: Mutex::new(None),
timer: Timer::default(), timer: Timer::default(),
round_timeout_multiplier: 4, round_timeout_multiplier: 4,
key: key, // TODO: key changing over time. key: key, // TODO: key changing over time.
@@ -331,13 +341,16 @@ impl<P, I> BftService<P, I>
/// ///
/// If the local signing key is an authority, this will begin the consensus process to build a /// If the local signing key is an authority, this will begin the consensus process to build a
/// block on top of it. If the executor fails to run the future, an error will be returned. /// block on top of it. If the executor fails to run the future, an error will be returned.
/// Returns `None` if the agreement on the block with given parent is already in progress.
pub fn build_upon<InStream, OutSink>(&self, header: &Header, input: InStream, output: OutSink) pub fn build_upon<InStream, OutSink>(&self, header: &Header, input: InStream, output: OutSink)
-> Result<BftFuture<<P as ProposerFactory>::Proposer, I, InStream, OutSink>, P::Error> where -> Result<Option<BftFuture<<P as ProposerFactory>::Proposer, I, InStream, OutSink>>, P::Error> where
InStream: Stream<Item=Communication, Error=<<P as ProposerFactory>::Proposer as Proposer>::Error>, InStream: Stream<Item=Communication, Error=<<P as ProposerFactory>::Proposer as Proposer>::Error>,
OutSink: Sink<SinkItem=Communication, SinkError=<<P as ProposerFactory>::Proposer as Proposer>::Error>, OutSink: Sink<SinkItem=Communication, SinkError=<<P as ProposerFactory>::Proposer as Proposer>::Error>,
{ {
let hash = header.blake2_256().into(); let hash = header.blake2_256().into();
let mut _preempted_consensus = None; // defers drop of live to the end. if self.live_agreement.lock().as_ref().map_or(false, |&(h, _)| h == hash) {
return Ok(None);
}
let authorities = self.client.authorities(&BlockId::Hash(hash))?; let authorities = self.client.authorities(&BlockId::Hash(hash))?;
@@ -347,7 +360,8 @@ impl<P, I> BftService<P, I>
let local_id = self.key.public().0; let local_id = self.key.public().0;
if !authorities.contains(&local_id) { if !authorities.contains(&local_id) {
self.live_agreements.lock().remove(&header.parent_hash); // cancel current agreement
self.live_agreement.lock().take();
Err(From::from(ErrorKind::InvalidAuthority(local_id)))?; Err(From::from(ErrorKind::InvalidAuthority(local_id)))?;
} }
@@ -373,25 +387,33 @@ impl<P, I> BftService<P, I>
let cancel = Arc::new(AtomicBool::new(false)); let cancel = Arc::new(AtomicBool::new(false));
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
{ // cancel current agreement.
let mut live = self.live_agreements.lock(); // defers drop of live to the end.
live.insert(hash, AgreementHandle { let _preempted_consensus = {
mem::replace(&mut *self.live_agreement.lock(), Some((hash, AgreementHandle {
task: Some(rx), task: Some(rx),
cancel: cancel.clone(), cancel: cancel.clone(),
}); })))
};
// cancel any agreements attempted to build upon this block's parent Ok(Some(BftFuture {
// as clearly agreement has already been reached.
_preempted_consensus = live.remove(&header.parent_hash);
}
Ok(BftFuture {
inner: agreement, inner: agreement,
cancel: cancel, cancel: cancel,
send_task: Some(tx), send_task: Some(tx),
import: self.client.clone(), import: self.client.clone(),
}) }))
} }
/// Cancel current agreement if any.
pub fn cancel_agreement(&self) {
self.live_agreement.lock().take();
}
/// Get current agreement parent hash if any.
pub fn live_agreement(&self) -> Option<HeaderHash> {
self.live_agreement.lock().as_ref().map(|&(h, _)| h.clone())
}
} }
/// Given a total number of authorities, yield the maximum faulty that would be allowed. /// Given a total number of authorities, yield the maximum faulty that would be allowed.
@@ -634,7 +656,7 @@ mod tests {
{ {
BftService { BftService {
client: Arc::new(client), client: Arc::new(client),
live_agreements: Mutex::new(HashMap::new()), live_agreement: Mutex::new(None),
timer: Timer::default(), timer: Timer::default(),
round_timeout_multiplier: 4, round_timeout_multiplier: 4,
key: Arc::new(Keyring::One.into()), key: Arc::new(Keyring::One.into()),
@@ -673,17 +695,17 @@ mod tests {
let second_hash = second.blake2_256().into(); let second_hash = second.blake2_256().into();
let bft = service.build_upon(&first, stream::empty(), Output(Default::default())).unwrap(); let bft = service.build_upon(&first, stream::empty(), Output(Default::default())).unwrap();
assert!(service.live_agreements.lock().contains_key(&first_hash)); assert!(service.live_agreement.lock().as_ref().unwrap().0 == first_hash);
// turn the core so the future gets polled and sends its task to the // turn the core so the future gets polled and sends its task to the
// service. otherwise it deadlocks. // service. otherwise it deadlocks.
core.handle().execute(bft).unwrap(); core.handle().execute(bft.unwrap()).unwrap();
core.turn(Some(::std::time::Duration::from_millis(100))); core.turn(Some(::std::time::Duration::from_millis(100)));
let bft = service.build_upon(&second, stream::empty(), Output(Default::default())).unwrap(); let bft = service.build_upon(&second, stream::empty(), Output(Default::default())).unwrap();
assert!(!service.live_agreements.lock().contains_key(&first_hash)); assert!(service.live_agreement.lock().as_ref().unwrap().0 != first_hash);
assert!(service.live_agreements.lock().contains_key(&second_hash)); assert!(service.live_agreement.lock().as_ref().unwrap().0 == second_hash);
core.handle().execute(bft).unwrap(); core.handle().execute(bft.unwrap()).unwrap();
core.turn(Some(::std::time::Duration::from_millis(100))); core.turn(Some(::std::time::Duration::from_millis(100)));
} }
+23
View File
@@ -45,6 +45,12 @@ pub trait BlockchainEvents {
fn import_notification_stream(&self) -> BlockchainEventStream; fn import_notification_stream(&self) -> BlockchainEventStream;
} }
/// Chain head information.
pub trait ChainHead {
/// Get best block header.
fn best_block_header(&self) -> Result<block::Header, error::Error>;
}
/// Client info /// Client info
// TODO: split queue info from chain info and amalgamate into single struct. // TODO: split queue info from chain info and amalgamate into single struct.
#[derive(Debug)] #[derive(Debug)]
@@ -377,6 +383,12 @@ impl<B, E> Client<B, E> where
pub fn justification(&self, id: &BlockId) -> error::Result<Option<primitives::bft::Justification>> { pub fn justification(&self, id: &BlockId) -> error::Result<Option<primitives::bft::Justification>> {
self.backend.blockchain().justification(*id) self.backend.blockchain().justification(*id)
} }
/// Get best block header.
pub fn best_block_header(&self) -> error::Result<block::Header> {
let info = self.backend.blockchain().info().map_err(|e| error::Error::from_blockchain(Box::new(e)))?;
Ok(self.header(&BlockId::Hash(info.best_hash))?.expect("Best block header must always exist"))
}
} }
impl<B, E> bft::BlockImport for Client<B, E> impl<B, E> bft::BlockImport for Client<B, E>
@@ -420,6 +432,17 @@ impl<B, E> BlockchainEvents for Client<B, E>
} }
} }
impl<B, E> ChainHead for Client<B, E>
where
B: backend::Backend,
E: state_machine::CodeExecutor,
error::Error: From<<B::State as state_machine::backend::Backend>::Error>
{
fn best_block_header(&self) -> error::Result<block::Header> {
Client::best_block_header(self)
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
+1 -1
View File
@@ -44,6 +44,6 @@ pub mod genesis;
pub mod block_builder; pub mod block_builder;
mod client; mod client;
pub use client::{Client, ClientInfo, CallResult, ImportResult, pub use client::{Client, ClientInfo, CallResult, ImportResult, ChainHead,
BlockStatus, BlockOrigin, new_in_mem, BlockchainEventStream, BlockchainEvents}; BlockStatus, BlockOrigin, new_in_mem, BlockchainEventStream, BlockchainEvents};
pub use blockchain::Info as ChainInfo; pub use blockchain::Info as ChainInfo;
+39 -20
View File
@@ -46,8 +46,8 @@ pub struct Consensus {
peers: HashMap<PeerId, PeerConsensus>, peers: HashMap<PeerId, PeerConsensus>,
our_candidate: Option<(Hash, Vec<u8>)>, our_candidate: Option<(Hash, Vec<u8>)>,
statement_sink: Option<mpsc::UnboundedSender<message::Statement>>, statement_sink: Option<mpsc::UnboundedSender<message::Statement>>,
bft_message_sink: Option<mpsc::UnboundedSender<message::BftMessage>>, bft_message_sink: Option<mpsc::UnboundedSender<message::LocalizedBftMessage>>,
message_timestamps: HashMap<Hash, Instant>, messages: HashMap<Hash, (Instant, message::Message)>,
} }
impl Consensus { impl Consensus {
@@ -58,7 +58,7 @@ impl Consensus {
our_candidate: None, our_candidate: None,
statement_sink: None, statement_sink: None,
bft_message_sink: None, bft_message_sink: None,
message_timestamps: Default::default(), messages: Default::default(),
} }
} }
@@ -69,13 +69,20 @@ impl Consensus {
} }
/// Handle new connected peer. /// Handle new connected peer.
pub fn new_peer(&mut self, _io: &mut SyncIo, _protocol: &Protocol, peer_id: PeerId, roles: &[message::Role]) { pub fn new_peer(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId, roles: &[message::Role]) {
if roles.iter().any(|r| *r == message::Role::Validator) { if roles.iter().any(|r| *r == message::Role::Validator) {
trace!(target:"sync", "Registering validator {}", peer_id); trace!(target:"sync", "Registering validator {}", peer_id);
// Send out all known messages.
// TODO: limit by size
let mut known_messages = HashSet::new();
for (hash, &(_, ref m)) in self.messages.iter() {
known_messages.insert(hash.clone());
protocol.send_message(io, peer_id, m.clone());
}
self.peers.insert(peer_id, PeerConsensus { self.peers.insert(peer_id, PeerConsensus {
candidate_fetch: None, candidate_fetch: None,
candidate_available: None, candidate_available: None,
known_messages: Default::default(), known_messages,
}); });
} }
} }
@@ -88,13 +95,16 @@ impl Consensus {
} }
} }
fn register_message(&mut self, hash: Hash) { fn register_message(&mut self, hash: Hash, message: message::Message) {
if let Entry::Vacant(entry) = self.message_timestamps.entry(hash) { if let Entry::Vacant(entry) = self.messages.entry(hash) {
entry.insert(Instant::now()); entry.insert((Instant::now(), message));
} }
} }
pub fn on_statement(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId, statement: message::Statement, hash: Hash) { pub fn on_statement(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId, statement: message::Statement, hash: Hash) {
if self.messages.contains_key(&hash) {
trace!(target:"sync", "Ignored already known statement from {}", peer_id);
}
if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
// TODO: validate signature? // TODO: validate signature?
match &statement.statement { match &statement.statement {
@@ -114,9 +124,10 @@ impl Consensus {
trace!(target:"sync", "Ignored statement from unregistered peer {}", peer_id); trace!(target:"sync", "Ignored statement from unregistered peer {}", peer_id);
return; return;
} }
self.register_message(hash.clone()); let message = Message::Statement(statement);
self.register_message(hash.clone(), message.clone());
// Propagate to other peers. // Propagate to other peers.
self.propagate(io, protocol, Message::Statement(statement), hash); self.propagate(io, protocol, message, hash);
} }
pub fn statements(&mut self) -> mpsc::UnboundedReceiver<message::Statement>{ pub fn statements(&mut self) -> mpsc::UnboundedReceiver<message::Statement>{
@@ -125,7 +136,10 @@ impl Consensus {
stream stream
} }
pub fn on_bft_message(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId, message: message::BftMessage, hash: Hash) { pub fn on_bft_message(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId, message: message::LocalizedBftMessage, hash: Hash) {
if self.messages.contains_key(&hash) {
trace!(target:"sync", "Ignored already known BFT message from {}", peer_id);
}
if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
peer.known_messages.insert(hash); peer.known_messages.insert(hash);
// TODO: validate signature? // TODO: validate signature?
@@ -140,12 +154,13 @@ impl Consensus {
trace!(target:"sync", "Ignored BFT statement from unregistered peer {}", peer_id); trace!(target:"sync", "Ignored BFT statement from unregistered peer {}", peer_id);
return; return;
} }
self.register_message(hash.clone()); let message = Message::BftMessage(message);
self.register_message(hash.clone(), message.clone());
// Propagate to other peers. // Propagate to other peers.
self.propagate(io, protocol, Message::BftMessage(message), hash); self.propagate(io, protocol, message, hash);
} }
pub fn bft_messages(&mut self) -> mpsc::UnboundedReceiver<message::BftMessage>{ pub fn bft_messages(&mut self) -> mpsc::UnboundedReceiver<message::LocalizedBftMessage>{
let (sink, stream) = mpsc::unbounded(); let (sink, stream) = mpsc::unbounded();
self.bft_message_sink = Some(sink); self.bft_message_sink = Some(sink);
stream stream
@@ -180,16 +195,16 @@ impl Consensus {
trace!(target:"sync", "Broadcasting statement {:?}", statement); trace!(target:"sync", "Broadcasting statement {:?}", statement);
let message = Message::Statement(statement); let message = Message::Statement(statement);
let hash = Protocol::hash_message(&message); let hash = Protocol::hash_message(&message);
self.register_message(hash.clone()); self.register_message(hash.clone(), message.clone());
self.propagate(io, protocol, message, hash); self.propagate(io, protocol, message, hash);
} }
pub fn send_bft_message(&mut self, io: &mut SyncIo, protocol: &Protocol, message: message::BftMessage) { pub fn send_bft_message(&mut self, io: &mut SyncIo, protocol: &Protocol, message: message::LocalizedBftMessage) {
// Broadcast message to all validators. // Broadcast message to all validators.
trace!(target:"sync", "Broadcasting BFT message {:?}", message); trace!(target:"sync", "Broadcasting BFT message {:?}", message);
let message = Message::BftMessage(message); let message = Message::BftMessage(message);
let hash = Protocol::hash_message(&message); let hash = Protocol::hash_message(&message);
self.register_message(hash.clone()); self.register_message(hash.clone(), message.clone());
self.propagate(io, protocol, message, hash); self.propagate(io, protocol, message, hash);
} }
@@ -237,10 +252,14 @@ impl Consensus {
pub fn collect_garbage(&mut self) { pub fn collect_garbage(&mut self) {
let expiration = Duration::from_secs(MESSAGE_LIFETIME_SECONDS); let expiration = Duration::from_secs(MESSAGE_LIFETIME_SECONDS);
let now = Instant::now(); let now = Instant::now();
self.message_timestamps.retain(|_, timestamp| *timestamp + expiration < now); let before = self.messages.len();
let timestamps = &self.message_timestamps; self.messages.retain(|_, &mut (timestamp, _)| timestamp < now + expiration);
if self.messages.len() != before {
trace!(target:"sync", "Cleaned up {} stale messages", before - self.messages.len());
}
let messages = &self.messages;
for (_, ref mut peer) in self.peers.iter_mut() { for (_, ref mut peer) in self.peers.iter_mut() {
peer.known_messages.retain(|h| timestamps.contains_key(h)); peer.known_messages.retain(|h| messages.contains_key(h));
} }
} }
} }
+1 -1
View File
@@ -62,7 +62,7 @@ pub use service::{Service, FetchFuture, StatementStream, ConsensusService, BftMe
pub use protocol::{ProtocolStatus}; pub use protocol::{ProtocolStatus};
pub use sync::{Status as SyncStatus, SyncState}; pub use sync::{Status as SyncStatus, SyncState};
pub use network::{NonReservedPeerMode, ConnectionFilter, ConnectionDirection, NetworkConfiguration}; pub use network::{NonReservedPeerMode, ConnectionFilter, ConnectionDirection, NetworkConfiguration};
pub use message::{Statement, BftMessage, ConsensusVote, SignedConsensusVote, SignedConsensusMessage, SignedConsensusProposal}; pub use message::{Statement, BftMessage, LocalizedBftMessage, ConsensusVote, SignedConsensusVote, SignedConsensusMessage, SignedConsensusProposal};
pub use error::Error; pub use error::Error;
pub use config::{Role, ProtocolConfig}; pub use config::{Role, ProtocolConfig};
+13 -1
View File
@@ -149,6 +149,8 @@ pub enum UnsignedStatement {
/// A signed statement. /// A signed statement.
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct Statement { pub struct Statement {
/// Parent relay chain block header hash.
pub parent_hash: HeaderHash,
/// The statement. /// The statement.
pub statement: UnsignedStatement, pub statement: UnsignedStatement,
/// The signature. /// The signature.
@@ -157,6 +159,7 @@ pub struct Statement {
pub sender: AuthorityId, pub sender: AuthorityId,
} }
/// Communication that can occur between participants in consensus. /// Communication that can occur between participants in consensus.
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub enum BftMessage { pub enum BftMessage {
@@ -166,6 +169,15 @@ pub enum BftMessage {
Auxiliary(Justification), Auxiliary(Justification),
} }
/// BFT Consensus message with parent header hash attached to it.
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct LocalizedBftMessage {
/// Consensus message.
pub message: BftMessage,
/// Parent header hash.
pub parent_hash: HeaderHash,
}
/// A localized proposal message. Contains two signed pieces of data. /// A localized proposal message. Contains two signed pieces of data.
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct SignedConsensusProposal { pub struct SignedConsensusProposal {
@@ -233,7 +245,7 @@ pub enum Message {
/// Candidate response. /// Candidate response.
CandidateResponse(CandidateResponse), CandidateResponse(CandidateResponse),
/// BFT Consensus statement. /// BFT Consensus statement.
BftMessage(BftMessage), BftMessage(LocalizedBftMessage),
} }
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
+2 -2
View File
@@ -303,13 +303,13 @@ impl Protocol {
self.consensus.lock().on_statement(io, self, peer, statement, hash); self.consensus.lock().on_statement(io, self, peer, statement, hash);
} }
fn on_bft_message(&self, io: &mut SyncIo, peer: PeerId, message: message::BftMessage, hash: Hash) { fn on_bft_message(&self, io: &mut SyncIo, peer: PeerId, message: message::LocalizedBftMessage, hash: Hash) {
trace!(target: "sync", "BFT message from {}: {:?}", peer, message); trace!(target: "sync", "BFT message from {}: {:?}", peer, message);
self.consensus.lock().on_bft_message(io, self, peer, message, hash); self.consensus.lock().on_bft_message(io, self, peer, message, hash);
} }
/// See `ConsensusService` trait. /// See `ConsensusService` trait.
pub fn send_bft_message(&self, io: &mut SyncIo, message: message::BftMessage) { pub fn send_bft_message(&self, io: &mut SyncIo, message: message::LocalizedBftMessage) {
self.consensus.lock().send_bft_message(io, self, message) self.consensus.lock().send_bft_message(io, self, message)
} }
+4 -4
View File
@@ -28,7 +28,7 @@ use protocol::{Protocol, ProtocolStatus, PeerInfo as ProtocolPeerInfo, Transacti
use config::{ProtocolConfig}; use config::{ProtocolConfig};
use error::Error; use error::Error;
use chain::Client; use chain::Client;
use message::{Statement, BftMessage}; use message::{Statement, LocalizedBftMessage};
/// Polkadot devp2p protocol id /// Polkadot devp2p protocol id
pub const DOT_PROTOCOL_ID: ProtocolId = *b"dot"; pub const DOT_PROTOCOL_ID: ProtocolId = *b"dot";
@@ -38,7 +38,7 @@ pub type FetchFuture = oneshot::Receiver<Vec<u8>>;
/// Type that represents statement stream. /// Type that represents statement stream.
pub type StatementStream = mpsc::UnboundedReceiver<Statement>; pub type StatementStream = mpsc::UnboundedReceiver<Statement>;
/// Type that represents bft messages stream. /// Type that represents bft messages stream.
pub type BftMessageStream = mpsc::UnboundedReceiver<BftMessage>; pub type BftMessageStream = mpsc::UnboundedReceiver<LocalizedBftMessage>;
bitflags! { bitflags! {
/// Node roles bitmask. /// Node roles bitmask.
@@ -93,7 +93,7 @@ pub trait ConsensusService: Send + Sync {
/// Get BFT message stream. /// Get BFT message stream.
fn bft_messages(&self) -> BftMessageStream; fn bft_messages(&self) -> BftMessageStream;
/// Send out a BFT message. /// Send out a BFT message.
fn send_bft_message(&self, message: BftMessage); fn send_bft_message(&self, message: LocalizedBftMessage);
} }
/// devp2p Protocol handler /// devp2p Protocol handler
@@ -255,7 +255,7 @@ impl ConsensusService for Service {
self.handler.protocol.bft_messages() self.handler.protocol.bft_messages()
} }
fn send_bft_message(&self, message: BftMessage) { fn send_bft_message(&self, message: LocalizedBftMessage) {
self.network.with_context(DOT_PROTOCOL_ID, |context| { self.network.with_context(DOT_PROTOCOL_ID, |context| {
self.handler.protocol.send_bft_message(&mut NetSyncIo::new(context), message); self.handler.protocol.send_bft_message(&mut NetSyncIo::new(context), message);
}); });
@@ -50,6 +50,13 @@ use runtime_support::StorageValue;
use primitives::traits::{self, Header, Zero, One, Checkable, Applyable, CheckEqual, Executable, MakePayment}; use primitives::traits::{self, Header, Zero, One, Checkable, Applyable, CheckEqual, Executable, MakePayment};
use codec::Slicable; use codec::Slicable;
/// Compute the extrinsics root of a list of extrinsics.
pub fn extrinsics_root<H: Hashing, E: Slicable>(extrinsics: &[E]) -> H::Output {
let xts = extrinsics.iter().map(Slicable::encode).collect::<Vec<_>>();
let xts = xts.iter().map(Vec::as_slice).collect::<Vec<_>>();
H::enumerated_trie_root(&xts)
}
pub struct Executive< pub struct Executive<
System, System,
Block, Block,
@@ -82,11 +89,9 @@ impl<
); );
// check transaction trie root represents the transactions. // check transaction trie root represents the transactions.
let txs = block.extrinsics().iter().map(Slicable::encode).collect::<Vec<_>>(); let xts_root = extrinsics_root::<System::Hashing, _>(&block.extrinsics());
let txs = txs.iter().map(Vec::as_slice).collect::<Vec<_>>(); header.extrinsics_root().check_equal(&xts_root);
let txs_root = System::Hashing::enumerated_trie_root(&txs); assert!(header.extrinsics_root() == &xts_root, "Transaction trie root must be valid.");
header.extrinsics_root().check_equal(&txs_root);
assert!(header.extrinsics_root() == &txs_root, "Transaction trie root must be valid.");
} }
/// Actually execute all transitioning for `block`. /// Actually execute all transitioning for `block`.
@@ -164,7 +169,7 @@ impl<
fn post_finalise(header: &System::Header) { fn post_finalise(header: &System::Header) {
// store the header hash in storage; we can't do it before otherwise there would be a // store the header hash in storage; we can't do it before otherwise there would be a
// cyclic dependency. // cyclic dependency.
<system::Module<System>>::record_block_hash(header) <system::Module<System>>::record_block_hash(header);
} }
} }
@@ -108,23 +108,27 @@ impl<T: Trait> Module<T> {
} }
/// Records a particular block number and hash combination. /// Records a particular block number and hash combination.
pub fn record_block_hash<H: traits::Header<Number = T::BlockNumber>>(header: &H) { pub fn record_block_hash<H: traits::Header<Number = T::BlockNumber, Hash = T::Hash>>(header: &H) {
// store the header hash in storage; we can't do it before otherwise there would be a // store the header hash in storage; we can't do it before otherwise there would be a
// cyclic dependency. // cyclic dependency.
<BlockHash<T>>::insert(header.number(), &T::Hashing::hash_of(header)); let h = T::Hashing::hash_of(header);
<BlockHash<T>>::insert(header.number(), &h);
Self::initialise(&(*header.number() + One::one()), &h, &Default::default());
} }
/// Initializes the state following the determination of the genesis block. /// Initializes the state following the determination of the genesis block.
pub fn initialise_genesis_state<H: traits::Header<Number = T::BlockNumber>>(header: &H) { pub fn initialise_genesis_state<H: traits::Header<Number = T::BlockNumber, Hash = T::Hash>>(header: &H) {
Self::record_block_hash(header); Self::record_block_hash(header);
} }
/// Calculate the current block's random seed. /// Calculate the current block's random seed.
fn calculate_random() -> T::Hash { fn calculate_random() -> T::Hash {
assert!(Self::block_number() > Zero::zero(), "Block number may never be zero");
(0..81) (0..81)
.scan( .scan(
{ let mut n = Self::block_number().clone(); n -= T::BlockNumber::one(); n }, Self::block_number() - One::one(),
|c, _| { if *c > T::BlockNumber::zero() { *c -= T::BlockNumber::one() }; Some(c.clone()) |c, _| { if *c > Zero::zero() { *c -= One::one() }; Some(*c)
}) })
.map(Self::block_hash) .map(Self::block_hash)
.triplet_mix() .triplet_mix()
+3
View File
@@ -0,0 +1,3 @@
#!/bin/bash
cp polkadot/runtime/wasm/target/wasm32-unknown-unknown/release/polkadot_runtime.compact.wasm polkadot/runtime/wasm/genesis.wasm