From 59af4de4fdb851dd736759d88fd77661e07b89ad Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 26 Nov 2018 16:24:26 +0100 Subject: [PATCH] GRANDPA: add commit messages (#1151) * get compiling with latest version of grandpa * generalize UntilImported to prepare for waiting for commit message targets * extract until_imported out to own module * logic for blocking commits until enough blocks imported * add tests for commit message blocking logic * pass through commit mesage round number as well * extract communication streams to own module * add Error implementation for ExitOrError * introduce stream adapter for checking commit messages * output sink for commits * implement the unimplemented * remove extra line * update to latest version of grandpa api * update finality-grandpa to 0.4.0 * Use filter_map earlier when checking incoming commits messages Co-Authored-By: rphmeier * address some grumbles --- substrate/Cargo.lock | 73 ++- substrate/core/client/src/lib.rs | 2 +- substrate/core/finality-grandpa/Cargo.toml | 3 +- .../finality-grandpa/src/communication.rs | 281 +++++++++ substrate/core/finality-grandpa/src/lib.rs | 431 +++++--------- substrate/core/finality-grandpa/src/tests.rs | 36 ++ .../finality-grandpa/src/until_imported.rs | 560 ++++++++++++++++++ 7 files changed, 1093 insertions(+), 293 deletions(-) create mode 100644 substrate/core/finality-grandpa/src/communication.rs create mode 100644 substrate/core/finality-grandpa/src/until_imported.rs diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index ef32ae3c9b..f155a7cc89 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -670,7 +670,7 @@ dependencies = [ [[package]] name = "finality-grandpa" -version = "0.3.0" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2196,6 +2196,33 @@ dependencies = [ "winapi 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "rand" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)", + "fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.43 (registry+https://github.com/rust-lang/crates.io-index)", + "rand_chacha 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "rand_core 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "rand_hc 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "rand_isaac 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "rand_pcg 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "rand_xorshift 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "rustc_version 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "rand_chacha" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "rand_core 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "rustc_version 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "rand_core" version = "0.2.2" @@ -2209,6 +2236,39 @@ name = "rand_core" version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "rand_hc" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "rand_core 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "rand_isaac" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "rand_core 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "rand_pcg" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "rand_core 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "rustc_version 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "rand_xorshift" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "rand_core 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "rayon" version = "0.8.2" @@ -3238,12 +3298,13 @@ name = "substrate-finality-grandpa" version = "0.1.0" dependencies = [ "env_logger 0.5.13 (registry+https://github.com/rust-lang/crates.io-index)", - "finality-grandpa 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "finality-grandpa 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "parity-codec 2.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "parity-codec-derive 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", "sr-primitives 0.1.0", "substrate-client 0.1.0", "substrate-consensus-common 0.1.0", @@ -4440,7 +4501,7 @@ dependencies = [ "checksum failure_derive 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "64c2d913fe8ed3b6c6518eedf4538255b989945c14c2a7d5cbff62a5e2120596" "checksum fake-simd 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed" "checksum fdlimit 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b1ee15a7050e5580b3712877157068ea713b245b080ff302ae2ca973cfcd9baa" -"checksum finality-grandpa 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "be6d2735e8f570474c7925a60ebe04ec0bdd9eea7cc4fddab78a0ecfdefec20e" +"checksum finality-grandpa 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "4fc88b8ddddcf3f998b8196d93c3ce31427c5b241cfe6c5a342e2a3f5d13ecbb" "checksum fixed-hash 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a557e80084b05c32b455963ff565a9de6f2866da023d6671705c6aff6f65e01c" "checksum fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3" "checksum foreign-types 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" @@ -4582,8 +4643,14 @@ dependencies = [ "checksum rand 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)" = "15a732abf9d20f0ad8eeb6f909bf6868722d9a06e1e50802b6a70351f40b4eb1" "checksum rand 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "8356f47b32624fef5b3301c1be97e5944ecdd595409cc5da11d05f211db6cfbd" "checksum rand 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e464cd887e869cddcae8792a4ee31d23c7edd516700695608f5b98c67ee0131c" +"checksum rand 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "de3f08319b5395bd19b70e73c4c465329495db02dafeb8ca711a20f1c2bd058c" +"checksum rand_chacha 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "771b009e3a508cb67e8823dda454aaa5368c7bc1c16829fb77d3e980440dd34a" "checksum rand_core 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1961a422c4d189dfb50ffa9320bf1f2a9bd54ecb92792fb9477f99a1045f3372" "checksum rand_core 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "0905b6b7079ec73b314d4c748701f6931eb79fd97c668caa3f1899b22b32c6db" +"checksum rand_hc 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7b40677c7be09ae76218dc623efbf7b18e34bced3f38883af07bb75630a21bc4" +"checksum rand_isaac 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2d6ecfe9ebf36acd47a49d150990b047a5f7db0a7236ee2414b7ff5cc1097c7b" +"checksum rand_pcg 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "086bd09a33c7044e56bb44d5bdde5a60e7f119a9e95b0775f545de759a32fe05" +"checksum rand_xorshift 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "effa3fcaa47e18db002bdde6060944b6d2f9cfd8db471c30e873448ad9187be3" "checksum rayon 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)" = "b614fe08b6665cb9a231d07ac1364b0ef3cb3698f1239ee0c4c3a88a524f54c8" "checksum rayon 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "373814f27745b2686b350dd261bfd24576a6fb0e2c5919b3a2b6005f820b0473" "checksum rayon-core 1.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b055d1e92aba6877574d8fe604a63c8b5df60f60e5982bf7ccbb1338ea527356" diff --git a/substrate/core/client/src/lib.rs b/substrate/core/client/src/lib.rs index 58fced9687..a73671ca47 100644 --- a/substrate/core/client/src/lib.rs +++ b/substrate/core/client/src/lib.rs @@ -106,7 +106,7 @@ pub use client::{ new_with_backend, new_in_mem, BlockBody, BlockStatus, ImportNotifications, FinalityNotifications, BlockchainEvents, - Client, ClientInfo, ChainHead, + BlockImportNotification, Client, ClientInfo, ChainHead, }; #[cfg(feature = "std")] pub use notifications::{StorageEventStream, StorageChangeSet}; diff --git a/substrate/core/finality-grandpa/Cargo.toml b/substrate/core/finality-grandpa/Cargo.toml index 9fe2776744..6bf16e77d2 100644 --- a/substrate/core/finality-grandpa/Cargo.toml +++ b/substrate/core/finality-grandpa/Cargo.toml @@ -17,9 +17,10 @@ log = "0.4" parking_lot = "0.4" tokio = "0.1.7" substrate-finality-grandpa-primitives = { path = "primitives" } +rand = "0.6" [dependencies.finality-grandpa] -version = "0.3.0" +version = "0.4.0" features = ["derive-codec"] [dev-dependencies] diff --git a/substrate/core/finality-grandpa/src/communication.rs b/substrate/core/finality-grandpa/src/communication.rs new file mode 100644 index 0000000000..8e06a0e7ca --- /dev/null +++ b/substrate/core/finality-grandpa/src/communication.rs @@ -0,0 +1,281 @@ +// Copyright 2017-2018 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Incoming message streams that verify signatures, and outgoing message streams +//! that sign or re-shape. + +use futures::prelude::*; +use futures::sync::mpsc; +use codec::{Encode, Decode}; +use substrate_primitives::{ed25519, AuthorityId}; +use runtime_primitives::traits::Block as BlockT; +use {Error, Network, Message, SignedMessage, Commit, CompactCommit}; + +use std::collections::HashMap; +use std::sync::Arc; + +fn localized_payload(round: u64, set_id: u64, message: &E) -> Vec { + (message, round, set_id).encode() +} + +// check a message. +fn check_message_sig( + message: &Message, + id: &AuthorityId, + signature: &ed25519::Signature, + round: u64, + set_id: u64, +) -> Result<(), ()> { + let as_public = ::ed25519::Public::from_raw(id.0); + let encoded_raw = localized_payload(round, set_id, message); + if ::ed25519::verify_strong(signature, &encoded_raw, as_public) { + Ok(()) + } else { + debug!(target: "afg", "Bad signature on message from {:?}", id); + Err(()) + } +} + +/// converts a message stream into a stream of signed messages. +/// the output stream checks signatures also. +pub(crate) fn checked_message_stream( + round: u64, + set_id: u64, + inner: S, + voters: Arc>, +) + -> impl Stream,Error=Error> where + S: Stream,Error=()> +{ + inner + .filter_map(|raw| { + let decoded = SignedMessage::::decode(&mut &raw[..]); + if decoded.is_none() { + debug!(target: "afg", "Skipping malformed message {:?}", raw); + } + decoded + }) + .and_then(move |msg| { + // check signature. + if !voters.contains_key(&msg.id) { + debug!(target: "afg", "Skipping message from unknown voter {}", msg.id); + return Ok(None); + } + + // we ignore messages where the signature doesn't check out. + let res = check_message_sig::( + &msg.message, + &msg.id, + &msg.signature, + round, + set_id + ); + Ok(res.map(move |()| msg).ok()) + }) + .filter_map(|x| x) + .map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream"))) +} + +struct OutgoingMessages { + round: u64, + set_id: u64, + locals: Option<(Arc, AuthorityId)>, + sender: mpsc::UnboundedSender>, + network: N, +} + +impl Sink for OutgoingMessages { + type SinkItem = Message; + type SinkError = Error; + + fn start_send(&mut self, msg: Message) -> StartSend, Error> { + // when locals exist, sign messages on import + if let Some((ref pair, local_id)) = self.locals { + let encoded = localized_payload(self.round, self.set_id, &msg); + let signature = pair.sign(&encoded[..]); + let signed = SignedMessage:: { + message: msg, + signature, + id: local_id, + }; + + // forward to network and to inner sender. + self.network.send_message(self.round, self.set_id, signed.encode()); + let _ = self.sender.unbounded_send(signed); + } + + Ok(AsyncSink::Ready) + } + + fn poll_complete(&mut self) -> Poll<(), Error> { Ok(Async::Ready(())) } + + fn close(&mut self) -> Poll<(), Error> { + // ignore errors since we allow this inner sender to be closed already. + self.sender.close().or_else(|_| Ok(Async::Ready(()))) + } +} + +impl Drop for OutgoingMessages { + fn drop(&mut self) { + self.network.drop_messages(self.round, self.set_id); + } +} + +/// A sink for outgoing messages. This signs the messages with the key, +/// if we are an authority. A stream for the signed messages is also returned. +/// +/// A future can push unsigned messages into the sink. They will be automatically +/// broadcast to the network. The returned stream should be combined with other input. +pub(crate) fn outgoing_messages( + round: u64, + set_id: u64, + local_key: Option>, + voters: Arc>, + network: N, +) -> ( + impl Stream,Error=Error>, + impl Sink,SinkError=Error>, +) { + let locals = local_key.and_then(|pair| { + let public = pair.public(); + let id = AuthorityId(public.0); + if voters.contains_key(&id) { + Some((pair, id)) + } else { + None + } + }); + + let (tx, rx) = mpsc::unbounded(); + let outgoing = OutgoingMessages:: { + round, + set_id, + network, + locals, + sender: tx, + }; + + let rx = rx.map_err(move |()| Error::Network( + format!("Failed to receive on unbounded receiver for round {}", round) + )); + + (rx, outgoing) +} + +fn check_compact_commit( + msg: CompactCommit, + voters: &HashMap, + round: u64, + set_id: u64, +) -> Option> { + use grandpa::Message as GrandpaMessage; + if msg.precommits.len() != msg.auth_data.len() || msg.precommits.is_empty() { + debug!(target: "afg", "Skipping malformed compact commit"); + return None; + } + + // check signatures on all contained precommits. + for (precommit, &(ref sig, ref id)) in msg.precommits.iter().zip(&msg.auth_data) { + if !voters.contains_key(id) { + debug!(target: "afg", "Skipping commit containing unknown voter {}", id); + return None; + } + + let res = check_message_sig::( + &GrandpaMessage::Precommit(precommit.clone()), + id, + sig, + round, + set_id, + ); + + if let Err(()) = res { + debug!(target: "afg", "Skipping commit containing bad message"); + return None; + } + } + + Some(msg) +} + +/// A stream for incoming commit messages. This checks all the signatures on the +/// messages. +pub(crate) fn checked_commit_stream( + set_id: u64, + inner: S, + voters: Arc>, +) + -> impl Stream),Error=Error> where + S: Stream,Error=()> +{ + inner + .filter_map(|raw| { + // this could be optimized by decoding piecewise. + let decoded = <(u64, CompactCommit)>::decode(&mut &raw[..]); + if decoded.is_none() { + trace!(target: "afg", "Skipping malformed commit message {:?}", raw); + } + decoded + }) + .filter_map(move |(round, msg)| { + check_compact_commit::(msg, &*voters, round, set_id).map(move |c| (round, c)) + }) + .map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream"))) +} + +/// An output sink for commit messages. +pub(crate) struct CommitsOut { + network: N, + set_id: u64, + _marker: ::std::marker::PhantomData, +} + +impl CommitsOut { + /// Create a new commit output stream. + pub(crate) fn new(network: N, set_id: u64) -> Self { + CommitsOut { + network, + set_id, + _marker: Default::default(), + } + } +} + +impl Sink for CommitsOut { + type SinkItem = (u64, Commit); + type SinkError = Error; + + fn start_send(&mut self, input: (u64, Commit)) -> StartSend { + let (round, commit) = input; + let (precommits, auth_data) = commit.precommits.into_iter() + .map(|signed| (signed.precommit, (signed.signature, signed.id))) + .unzip(); + + let compact_commit = CompactCommit:: { + target_hash: commit.target_hash, + target_number: commit.target_number, + precommits, + auth_data + }; + + self.network.send_commit(self.set_id, Encode::encode(&(round, compact_commit))); + + Ok(AsyncSink::Ready) + } + + fn close(&mut self) -> Poll<(), Error> { Ok(Async::Ready(())) } + fn poll_complete(&mut self) -> Poll<(), Error> { Ok(Async::Ready(())) } +} diff --git a/substrate/core/finality-grandpa/src/lib.rs b/substrate/core/finality-grandpa/src/lib.rs index 956b87df0e..9b48f5aa51 100644 --- a/substrate/core/finality-grandpa/src/lib.rs +++ b/substrate/core/finality-grandpa/src/lib.rs @@ -61,6 +61,7 @@ extern crate tokio; extern crate parking_lot; extern crate parity_codec as codec; extern crate substrate_finality_grandpa_primitives as fg_primitives; +extern crate rand; #[macro_use] extern crate log; @@ -81,35 +82,40 @@ extern crate env_logger; extern crate parity_codec_derive; use futures::prelude::*; -use futures::stream::Fuse; use futures::sync::mpsc; -use client::{Client, error::Error as ClientError, ImportNotifications, backend::Backend, CallExecutor}; +use client::{ + Client, error::Error as ClientError, backend::Backend, CallExecutor, BlockchainEvents +}; use client::blockchain::HeaderBackend; use client::runtime_api::TaggedTransactionQueue; use codec::{Encode, Decode}; use consensus_common::{BlockImport, ImportBlock, ImportResult, Authorities}; use runtime_primitives::traits::{ - NumberFor, Block as BlockT, Header as HeaderT, DigestFor, ProvideRuntimeApi + NumberFor, Block as BlockT, Header as HeaderT, DigestFor, ProvideRuntimeApi, Hash as HashT, }; use fg_primitives::GrandpaApi; use runtime_primitives::generic::BlockId; use substrate_primitives::{ed25519, H256, AuthorityId, Blake2Hasher}; -use tokio::timer::Interval; +use tokio::timer::Delay; use grandpa::Error as GrandpaError; use grandpa::{voter, round::State as RoundState, Equivocation, BlockNumberOps}; use network::{Service as NetworkService, ExHashT}; use network::consensus_gossip::{ConsensusMessage}; -use std::collections::{VecDeque, HashMap}; +use std::collections::HashMap; +use std::fmt; use std::sync::Arc; use std::time::{Instant, Duration}; use authorities::SharedAuthoritySet; +use until_imported::{UntilCommitBlocksImported, UntilVoteTargetImported}; pub use fg_primitives::ScheduledChange; mod authorities; +mod communication; +mod until_imported; #[cfg(feature="service-integration")] mod service_integration; @@ -138,6 +144,20 @@ pub type SignedMessage = grandpa::SignedMessage< pub type Prevote = grandpa::Prevote<::Hash, NumberFor>; /// A precommit message for this chain's block type. pub type Precommit = grandpa::Precommit<::Hash, NumberFor>; +/// A commit message for this chain's block type. +pub type Commit = grandpa::Commit< + ::Hash, + NumberFor, + ed25519::Signature, + AuthorityId +>; +/// A compact commit message for this chain's block type. +pub type CompactCommit = grandpa::CompactCommit< + ::Hash, + NumberFor, + ed25519::Signature, + AuthorityId +>; /// Configuration for the GRANDPA service. #[derive(Clone)] @@ -181,7 +201,7 @@ impl From for Error { /// handle to a gossip service or similar. /// /// Intended to be a lightweight handle such as an `Arc`. -pub trait Network : Clone { +pub trait Network: Clone { /// A stream of input messages for a topic. type In: Stream,Error=()>; @@ -194,6 +214,13 @@ pub trait Network : Clone { /// Clean up messages for a round. fn drop_messages(&self, round: u64, set_id: u64); + + /// Get a stream of commit messages for a specific set-id. This stream + /// should never logically conclude. + fn commit_messages(&self, set_id: u64) -> Self::In; + + /// Send message over the commit channel. + fn send_commit(&self, set_id: u64, message: Vec); } /// Bridge between NetworkService, gossiping consensus messages and Grandpa @@ -208,7 +235,6 @@ impl, H: ExHashT } } - impl, H: ExHashT> Clone for NetworkBridge { fn clone(&self) -> Self { NetworkBridge { @@ -218,10 +244,13 @@ impl, H: ExHashT } fn message_topic(round: u64, set_id: u64) -> B::Hash { - use runtime_primitives::traits::Hash as HashT; <::Hashing as HashT>::hash(format!("{}-{}", set_id, round).as_bytes()) } +fn commit_topic(set_id: u64) -> B::Hash { + <::Hashing as HashT>::hash(format!("{}-COMMITS", set_id).as_bytes()) +} + impl, H: ExHashT> Network for NetworkBridge { type In = mpsc::UnboundedReceiver; fn messages_for(&self, round: u64, set_id: u64) -> Self::In { @@ -240,6 +269,18 @@ impl, H: ExHashT let topic = message_topic::(round, set_id); self.service.consensus_gossip().write().collect_garbage(|t| t == &topic); } + + fn commit_messages(&self, set_id: u64) -> Self::In { + self.service.consensus_gossip().write().messages_for(commit_topic::(set_id)) + } + + fn send_commit(&self, set_id: u64, message: Vec) { + let topic = commit_topic::(set_id); + let gossip = self.service.consensus_gossip(); + self.service.with_spec(move |_s, context|{ + gossip.write().multicast(context, topic, message); + }); + } } /// Something which can determine if a block is known. @@ -262,269 +303,6 @@ impl, RA> BlockStatus for Arc { - import_notifications: Fuse>, - status_check: Status, - inner: Fuse, - ready: VecDeque>, - check_pending: Interval, - pending: HashMap>>, -} - -impl, I: Stream> UntilImported { - fn new( - import_notifications: ImportNotifications, - status_check: Status, - stream: I, - ) -> Self { - // how often to check if pending messages that are waiting for blocks to be - // imported can be checked. - // - // the import notifications interval takes care of most of this; this is - // used in the event of missed import notifications - const CHECK_PENDING_INTERVAL: Duration = Duration::from_secs(5); - let now = Instant::now(); - - let check_pending = Interval::new(now + CHECK_PENDING_INTERVAL, CHECK_PENDING_INTERVAL); - UntilImported { - import_notifications: import_notifications.fuse(), - status_check, - inner: stream.fuse(), - ready: VecDeque::new(), - check_pending, - pending: HashMap::new(), - } - } -} - -impl, I> Stream for UntilImported - where I: Stream,Error=Error> -{ - type Item = SignedMessage; - type Error = Error; - - fn poll(&mut self) -> Poll>, Error> { - loop { - match self.inner.poll() { - Err(e) => return Err(e), - Ok(Async::Ready(None)) => return Ok(Async::Ready(None)), - Ok(Async::Ready(Some(signed_message))) => { - let (&target_hash, target_number) = signed_message.target(); - - // new message: hold it until the block is known. - if let Some(number) = self.status_check.block_number(target_hash)? { - if number != target_number { - warn!( - target: "afg", - "Authority {:?} signed GRANDPA message with \ - wrong block number for hash {}", - signed_message.id, - target_hash - ); - } else { - self.ready.push_back(signed_message) - } - } else { - self.pending.entry(target_hash) - .or_insert_with(Vec::new) - .push(signed_message); - } - } - Ok(Async::NotReady) => break, - } - } - - loop { - match self.import_notifications.poll() { - Err(_) => return Err(Error::Network(format!("Failed to get new message"))), - Ok(Async::Ready(None)) => return Ok(Async::Ready(None)), - Ok(Async::Ready(Some(notification))) => { - // new block imported. queue up all messages tied to that hash. - if let Some(messages) = self.pending.remove(¬ification.hash) { - self.ready.extend(messages); - } - } - Ok(Async::NotReady) => break, - } - } - - let mut update_interval = false; - while let Async::Ready(Some(_)) = self.check_pending.poll().map_err(Error::Timer)? { - update_interval = true; - } - - if update_interval { - let mut known_keys = Vec::new(); - for &block_hash in self.pending.keys() { - if let Some(number) = self.status_check.block_number(block_hash)? { - known_keys.push((block_hash, number)); - } - } - - for (known_hash, canon_number) in known_keys { - if let Some(mut pending_messages) = self.pending.remove(&known_hash) { - // verify canonicality of pending messages. - pending_messages.retain(|msg| { - let number_correct = msg.target().1 == canon_number; - if !number_correct { - warn!( - target: "afg", - "Authority {:?} signed GRANDPA message with \ - wrong block number for hash {}", - msg.id, - known_hash, - ); - } - number_correct - }); - self.ready.extend(pending_messages); - } - } - } - - if let Some(ready) = self.ready.pop_front() { - return Ok(Async::Ready(Some(ready))) - } - - if self.import_notifications.is_done() && self.inner.is_done() { - Ok(Async::Ready(None)) - } else { - Ok(Async::NotReady) - } - } -} - -// clears the network messages for inner round on drop. -struct ClearOnDrop { - round: u64, - set_id: u64, - inner: I, - network: N, -} - -impl Sink for ClearOnDrop { - type SinkItem = I::SinkItem; - type SinkError = I::SinkError; - - fn start_send(&mut self, item: Self::SinkItem) -> StartSend { - self.inner.start_send(item) - } - - fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { - self.inner.poll_complete() - } - - fn close(&mut self) -> Poll<(), Self::SinkError> { - self.inner.close() - } -} - -impl Drop for ClearOnDrop { - fn drop(&mut self) { - self.network.drop_messages(self.round, self.set_id); - } -} - -fn localized_payload(round: u64, set_id: u64, message: &E) -> Vec { - let mut v = message.encode(); - - round.using_encoded(|s| v.extend(s)); - set_id.using_encoded(|s| v.extend(s)); - - v -} - -// converts a message stream into a stream of signed messages. -// the output stream checks signatures also. -fn checked_message_stream( - round: u64, - set_id: u64, - inner: S, - voters: Arc>, -) - -> impl Stream,Error=Error> where - S: Stream,Error=()> -{ - inner - .filter_map(|raw| { - let decoded = SignedMessage::::decode(&mut &raw[..]); - if decoded.is_none() { - debug!(target: "afg", "Skipping malformed message {:?}", raw); - } - decoded - }) - .and_then(move |msg| { - // check signature. - if !voters.contains_key(&msg.id) { - debug!(target: "afg", "Skipping message from unknown voter {}", msg.id); - return Ok(None); - } - - let as_public = ::ed25519::Public::from_raw(msg.id.0); - let encoded_raw = localized_payload(round, set_id, &msg.message); - if ::ed25519::verify_strong(&msg.signature, &encoded_raw, as_public) { - Ok(Some(msg)) - } else { - debug!(target: "afg", "Skipping message with bad signature"); - Ok(None) - } - }) - .filter_map(|x| x) - .map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream"))) -} - -fn outgoing_messages( - round: u64, - set_id: u64, - local_key: Option>, - voters: Arc>, - network: N, -) -> ( - impl Stream,Error=Error>, - impl Sink,SinkError=Error>, -) { - let locals = local_key.and_then(|pair| { - let public = pair.public(); - let id = AuthorityId(public.0); - if voters.contains_key(&id) { - Some((pair, id)) - } else { - None - } - }); - - let (tx, rx) = mpsc::unbounded(); - let rx = rx - .map(move |msg: Message| { - // when locals exist, sign messages on import - if let Some((ref pair, local_id)) = locals { - let encoded = localized_payload(round, set_id, &msg); - let signature = pair.sign(&encoded[..]); - let signed = SignedMessage:: { - message: msg, - signature, - id: local_id, - }; - - // forward to network. - network.send_message(round, set_id, signed.encode()); - Some(signed) - } else { - None - } - }) - .filter_map(|x| x) - .map_err(move |()| Error::Network( - format!("Failed to receive on unbounded receiver for round {}", round) - )); - - let tx = tx.sink_map_err(move |e| Error::Network(format!("Failed to broadcast message \ - to network in round {}: {:?}", round, e))); - - (rx, tx) -} - /// The environment we run GRANDPA in. struct Environment { inner: Arc>, @@ -633,6 +411,17 @@ impl From for ExitOrError { } } +impl ::std::error::Error for ExitOrError { } + +impl fmt::Display for ExitOrError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + ExitOrError::Error(ref e) => write!(f, "{:?}", e), + ExitOrError::AuthoritiesChanged(_) => write!(f, "restarting voter on new authorities"), + } + } +} + impl, N, RA> voter::Environment> for Environment where Block: 'static, B: Backend + 'static, @@ -645,6 +434,8 @@ impl, N, RA> voter::Environment + Send>; type Id = AuthorityId; type Signature = ed25519::Signature; + + // regular round message streams type In = Box, Self::Signature, Self::Id>, Error = Self::Error, @@ -653,29 +444,26 @@ impl, N, RA> voter::Environment>, SinkError = Self::Error, > + Send>; + type Error = ExitOrError>; - #[allow(unreachable_code)] fn round_data( &self, round: u64 - ) -> voter::RoundData { - use client::BlockchainEvents; - use tokio::timer::Delay; - + ) -> voter::RoundData { let now = Instant::now(); let prevote_timer = Delay::new(now + self.config.gossip_duration * 2); let precommit_timer = Delay::new(now + self.config.gossip_duration * 4); // TODO: dispatch this with `mpsc::spawn`. - let incoming = checked_message_stream::( + let incoming = ::communication::checked_message_stream::( round, self.set_id, self.network.messages_for(round, self.set_id), self.voters.clone(), ); - let (out_rx, outgoing) = outgoing_messages::( + let (out_rx, outgoing) = ::communication::outgoing_messages::( round, self.set_id, self.config.local_key.clone(), @@ -685,7 +473,7 @@ impl, N, RA> voter::Environment, N, RA> voter::Environment, N, RA> voter::Environment) -> Result<(), Self::Error> { + fn finalize_block(&self, hash: Block::Hash, number: NumberFor, _commit: Commit) -> Result<(), Self::Error> { // ideally some handle to a synchronization oracle would be used // to avoid unconditionally notifying. if let Err(e) = self.inner.finalize_block(BlockId::Hash(hash), true) { @@ -794,6 +576,7 @@ impl, N, RA> voter::Environment, N, RA> voter::Environment Self::Timer { + use rand::{thread_rng, Rng}; + + //random between 0-1 seconds. + let delay: u64 = thread_rng().gen_range(0, 1000); + Box::new(Delay::new( + Instant::now() + Duration::from_millis(delay) + ).map_err(|e| Error::Timer(e).into())) + } + fn prevote_equivocation( &self, _round: u64, @@ -969,6 +762,52 @@ pub fn block_import, RA, PRA>( )) } +fn committer_communication, B, E, N, RA>( + set_id: u64, + voters: &Arc>, + client: &Arc>, + network: &N, +) -> ( + impl Stream< + Item = (u64, ::grandpa::CompactCommit, ed25519::Signature, AuthorityId>), + Error = ExitOrError>, + >, + impl Sink< + SinkItem = (u64, ::grandpa::Commit, ed25519::Signature, AuthorityId>), + SinkError = ExitOrError>, + >, +) where + B: Backend, + E: CallExecutor + Send + Sync, + N: Network, + RA: Send + Sync, + NumberFor: BlockNumberOps, +{ + // verification stream + let commit_in = ::communication::checked_commit_stream::( + set_id, + network.commit_messages(set_id), + voters.clone(), + ); + + // block commit messages until relevant blocks are imported. + let commit_in = UntilCommitBlocksImported::new( + client.import_notification_stream(), + client.clone(), + commit_in, + ); + + let commit_out = ::communication::CommitsOut::::new( + network.clone(), + set_id, + ); + + let commit_in = commit_in.map_err(Into::into); + let commit_out = commit_out.sink_map_err(Into::into); + + (commit_in, commit_out) +} + /// Run a GRANDPA voter as a task. Provide configuration and a link to a /// block import worker that has already been instantiated with `block_import`. pub fn run_grandpa, N, RA>( @@ -1027,7 +866,23 @@ pub fn run_grandpa, N, RA>( chain_info.chain.finalized_number, ); - let voter = voter::Voter::new(env, last_round_number, last_state, last_finalized); + let committer_data = committer_communication( + env.set_id, + &env.voters, + &client, + &network, + ); + + let voters = (*env.voters).clone(); + + let voter = voter::Voter::new( + env, + voters, + committer_data, + last_round_number, + last_state, + last_finalized, + ); let client = client.clone(); let config = config.clone(); let network = network.clone(); diff --git a/substrate/core/finality-grandpa/src/tests.rs b/substrate/core/finality-grandpa/src/tests.rs index ef5228e0fc..c8ba80098d 100644 --- a/substrate/core/finality-grandpa/src/tests.rs +++ b/substrate/core/finality-grandpa/src/tests.rs @@ -157,6 +157,21 @@ fn make_topic(round: u64, set_id: u64) -> Hash { hash } +fn make_commit_topic(set_id: u64) -> Hash { + let mut hash = Hash::default(); + + { + let raw = hash.as_mut(); + raw[16..22].copy_from_slice(b"commit"); + } + set_id.using_encoded(|s| { + let raw = hash.as_mut(); + raw[24..].copy_from_slice(s); + }); + + hash +} + impl Network for MessageRouting { type In = Box,Error=()> + Send>; @@ -190,6 +205,27 @@ impl Network for MessageRouting { gossip.collect_garbage(|t| t == &topic) }); } + + fn commit_messages(&self, set_id: u64) -> Self::In { + let inner = self.inner.lock(); + let peer = inner.peer(self.peer_id); + let mut gossip = peer.consensus_gossip().write(); + let messages = peer.with_spec(move |_, _| { + gossip.messages_for(make_commit_topic(set_id)) + }); + + let messages = messages.map_err( + move |_| panic!("Commit messages for set {} dropped too early", set_id) + ); + + Box::new(messages) + } + + fn send_commit(&self, set_id: u64, message: Vec) { + let mut inner = self.inner.lock(); + inner.peer(self.peer_id).gossip_message(make_commit_topic(set_id), message); + inner.route_until_complete(); + } } #[derive(Default, Clone)] diff --git a/substrate/core/finality-grandpa/src/until_imported.rs b/substrate/core/finality-grandpa/src/until_imported.rs new file mode 100644 index 0000000000..aba0472131 --- /dev/null +++ b/substrate/core/finality-grandpa/src/until_imported.rs @@ -0,0 +1,560 @@ +// Copyright 2017-2018 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Helper stream for waiting until one or more blocks are imported before +//! passing through inner items. This is done in a generic way to support +//! many different kinds of items. +//! +//! This is used for votes and commit messages currently. + +use super::{BlockStatus, Error, SignedMessage, CompactCommit}; + +use client::ImportNotifications; +use futures::prelude::*; +use futures::stream::Fuse; +use parking_lot::Mutex; +use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor}; +use substrate_primitives::AuthorityId; +use tokio::timer::Interval; + +use std::collections::{HashMap, VecDeque}; +use std::sync::{atomic::{AtomicUsize, Ordering}, Arc}; +use std::time::{Duration, Instant}; + +// something which will block until imported. +pub(crate) trait BlockUntilImported: Sized { + // the type that is blocked on. + type Blocked; + + /// new incoming item. For all internal items, + /// check if they require to be waited for. + /// if so, call the `Wait` closure. + /// if they are ready, call the `Ready` closure. + fn schedule_wait( + input: Self::Blocked, + status_check: &S, + wait: Wait, + ready: Ready, + ) -> Result<(), Error> where + S: BlockStatus, + Wait: FnMut(Block::Hash, Self), + Ready: FnMut(Self::Blocked); + + /// called when the wait has completed. The canonical number is passed through + /// for further checks. + fn wait_completed(self, canon_number: NumberFor) -> Option; +} + +/// Buffering imported messages until blocks with given hashes are imported. +pub(crate) struct UntilImported> { + import_notifications: Fuse>, + status_check: Status, + inner: Fuse, + ready: VecDeque, + check_pending: Interval, + pending: HashMap>, +} + +impl UntilImported + where Status: BlockStatus, M: BlockUntilImported +{ + /// Create a new `UntilImported` wrapper. + pub(crate) fn new( + import_notifications: ImportNotifications, + status_check: Status, + stream: I, + ) -> Self { + // how often to check if pending messages that are waiting for blocks to be + // imported can be checked. + // + // the import notifications interval takes care of most of this; this is + // used in the event of missed import notifications + const CHECK_PENDING_INTERVAL: Duration = Duration::from_secs(5); + let now = Instant::now(); + + let check_pending = Interval::new(now + CHECK_PENDING_INTERVAL, CHECK_PENDING_INTERVAL); + UntilImported { + import_notifications: import_notifications.fuse(), + status_check, + inner: stream.fuse(), + ready: VecDeque::new(), + check_pending, + pending: HashMap::new(), + } + } +} + +impl Stream for UntilImported where + Status: BlockStatus, + I: Stream, + M: BlockUntilImported, +{ + type Item = M::Blocked; + type Error = Error; + + fn poll(&mut self) -> Poll, Error> { + loop { + match self.inner.poll()? { + Async::Ready(None) => return Ok(Async::Ready(None)), + Async::Ready(Some(input)) => { + // new input: schedule wait of any parts which require + // blocks to be known. + let mut ready = &mut self.ready; + let mut pending = &mut self.pending; + M::schedule_wait( + input, + &self.status_check, + |target_hash, wait| pending + .entry(target_hash) + .or_insert_with(Vec::new) + .push(wait), + |ready_item| ready.push_back(ready_item), + )?; + } + Async::NotReady => break, + } + } + + loop { + match self.import_notifications.poll() { + Err(_) => return Err(Error::Network(format!("Failed to get new message"))), + Ok(Async::Ready(None)) => return Ok(Async::Ready(None)), + Ok(Async::Ready(Some(notification))) => { + // new block imported. queue up all messages tied to that hash. + if let Some(messages) = self.pending.remove(¬ification.hash) { + let canon_number = notification.header.number().clone(); + let ready_messages = messages.into_iter() + .filter_map(|m| m.wait_completed(canon_number)); + + self.ready.extend(ready_messages); + } + } + Ok(Async::NotReady) => break, + } + } + + let mut update_interval = false; + while let Async::Ready(Some(_)) = self.check_pending.poll().map_err(Error::Timer)? { + update_interval = true; + } + + if update_interval { + let mut known_keys = Vec::new(); + for &block_hash in self.pending.keys() { + if let Some(number) = self.status_check.block_number(block_hash)? { + known_keys.push((block_hash, number)); + } + } + + for (known_hash, canon_number) in known_keys { + if let Some(pending_messages) = self.pending.remove(&known_hash) { + let ready_messages = pending_messages.into_iter() + .filter_map(|m| m.wait_completed(canon_number)); + + self.ready.extend(ready_messages); + } + } + } + + if let Some(ready) = self.ready.pop_front() { + return Ok(Async::Ready(Some(ready))) + } + + if self.import_notifications.is_done() && self.inner.is_done() { + Ok(Async::Ready(None)) + } else { + Ok(Async::NotReady) + } + } +} + +fn warn_authority_wrong_target(hash: H, id: AuthorityId) { + warn!( + target: "afg", + "Authority {:?} signed GRANDPA message with \ + wrong block number for hash {}", + id, + hash, + ); +} + +impl BlockUntilImported for SignedMessage { + type Blocked = Self; + + fn schedule_wait( + msg: Self::Blocked, + status_check: &S, + mut wait: Wait, + mut ready: Ready, + ) -> Result<(), Error> where + S: BlockStatus, + Wait: FnMut(Block::Hash, Self), + Ready: FnMut(Self::Blocked), + { + let (&target_hash, target_number) = msg.target(); + + if let Some(number) = status_check.block_number(target_hash)? { + if number != target_number { + warn_authority_wrong_target(target_hash, msg.id); + } else { + ready(msg); + } + } else { + wait(target_hash, msg) + } + + Ok(()) + } + + fn wait_completed(self, canon_number: NumberFor) -> Option { + let (&target_hash, target_number) = self.target(); + if canon_number != target_number { + warn_authority_wrong_target(target_hash, self.id); + + None + } else { + Some(self) + } + } +} + +/// Helper type definition for the stream which waits until vote targets for +/// signed messages are imported. +pub(crate) type UntilVoteTargetImported = UntilImported>; + +/// This blocks a commit message's import until all blocks +/// referenced in its votes are known. +/// +/// This is used for compact commits which have already been checked for +/// structural soundness. +pub(crate) struct BlockCommitMessage { + inner: Arc<(AtomicUsize, Mutex)>>)>, + target_number: NumberFor, +} + +impl BlockUntilImported for BlockCommitMessage { + type Blocked = (u64, CompactCommit); + + fn schedule_wait( + input: Self::Blocked, + status_check: &S, + mut wait: Wait, + mut ready: Ready, + ) -> Result<(), Error> where + S: BlockStatus, + Wait: FnMut(Block::Hash, Self), + Ready: FnMut(Self::Blocked), + { + use std::collections::hash_map::Entry; + + enum KnownOrUnknown { + Known(N), + Unknown(N), + } + + impl KnownOrUnknown { + fn number(&self) -> &N { + match *self { + KnownOrUnknown::Known(ref n) => n, + KnownOrUnknown::Unknown(ref n) => n, + } + } + } + + let mut checked_hashes: HashMap<_, KnownOrUnknown>> = HashMap::new(); + let mut unknown_count = 0; + + { + // returns false when should early exit. + let mut query_known = |target_hash, perceived_number| -> Result { + // check integrity: all precommits for same hash have same number. + let canon_number = match checked_hashes.entry(target_hash) { + Entry::Occupied(entry) => entry.get().number().clone(), + Entry::Vacant(mut entry) => { + if let Some(number) = status_check.block_number(target_hash)? { + entry.insert(KnownOrUnknown::Known(number)); + number + + } else { + entry.insert(KnownOrUnknown::Unknown(perceived_number)); + unknown_count += 1; + perceived_number + } + } + }; + + if canon_number != perceived_number { + // invalid commit: messages targeting wrong number or + // at least different from other vote. in same commit. + return Ok(false); + } + + Ok(true) + }; + + let commit = &input.1; + + // add known hashes from the precommits. + for precommit in &commit.precommits { + let target_number = precommit.target_number; + let target_hash = precommit.target_hash; + + if !query_known(target_hash, target_number)? { + return Ok(()) + } + } + + // see if commit target hash is known. + if !query_known(commit.target_hash, commit.target_number)? { + return Ok(()) + } + } + + // none of the hashes in the commit message were unknown. + // we can just return the commit directly. + if unknown_count == 0 { + ready(input); + return Ok(()) + } + + let locked_commit = Arc::new((AtomicUsize::new(unknown_count), Mutex::new(Some(input)))); + + // schedule waits for all unknown messages. + // when the last one of these has `wait_completed` called on it, + // the commit will be returned. + // + // in the future, we may want to issue sync requests to the network + // if this is taking a long time. + for (hash, is_known) in checked_hashes { + if let KnownOrUnknown::Unknown(target_number) = is_known { + wait(hash, BlockCommitMessage { + inner: locked_commit.clone(), + target_number, + }) + } + } + + Ok(()) + } + + fn wait_completed(self, canon_number: NumberFor) -> Option { + if self.target_number != canon_number { + // if we return without deducting the counter, then none of the other + // handles can return the commit message. + return None; + } + + let mut last_count = self.inner.0.load(Ordering::Acquire); + + // CAS loop to ensure that we always have a last reader. + loop { + if last_count == 1 { // we are the last one left. + return self.inner.1.lock().take(); + } + + let prev_value = self.inner.0.compare_and_swap( + last_count, + last_count - 1, + Ordering::SeqCst, + ); + + if prev_value == last_count { + return None; + } else { + last_count = prev_value; + } + } + } +} + +/// A stream which gates off incoming commit messages until all referenced +/// block hashes have been imported. +pub(crate) type UntilCommitBlocksImported = UntilImported< + Block, + Status, + I, + BlockCommitMessage, +>; + +#[cfg(test)] +mod tests { + use super::*; + use tokio::runtime::current_thread::Runtime; + use tokio::timer::Delay; + use test_client::runtime::{Block, Hash, Header}; + use consensus_common::BlockOrigin; + use client::BlockImportNotification; + use futures::future::Either; + use futures::sync::mpsc; + use grandpa::Precommit; + + #[derive(Clone)] + struct TestChainState { + sender: mpsc::UnboundedSender>, + known_blocks: Arc>>, + } + + impl TestChainState { + fn new() -> (Self, ImportNotifications) { + let (tx, rx) = mpsc::unbounded(); + let state = TestChainState { + sender: tx, + known_blocks: Arc::new(Mutex::new(HashMap::new())), + }; + + (state, rx) + } + + fn block_status(&self) -> TestBlockStatus { + TestBlockStatus { inner: self.known_blocks.clone() } + } + + fn import_header(&self, header: Header) { + let hash = header.hash(); + let number = header.number().clone(); + + self.known_blocks.lock().insert(hash, number); + self.sender.unbounded_send(BlockImportNotification { + hash, + origin: BlockOrigin::File, + header, + is_new_best: false, + tags: Vec::new(), + }).unwrap(); + } + } + + struct TestBlockStatus { + inner: Arc>>, + } + + impl BlockStatus for TestBlockStatus { + fn block_number(&self, hash: Hash) -> Result, Error> { + Ok(self.inner.lock().get(&hash).map(|x| x.clone())) + } + } + + fn make_header(number: u64) -> Header { + Header::new( + number, + Default::default(), + Default::default(), + Default::default(), + Default::default(), + ) + } + + #[test] + fn blocking_commit_message() { + let h1 = make_header(5); + let h2 = make_header(6); + let h3 = make_header(7); + + let (chain_state, import_notifications) = TestChainState::new(); + let block_status = chain_state.block_status(); + + let unknown_commit = CompactCommit:: { + target_hash: h1.hash(), + target_number: 5, + precommits: vec![ + Precommit { + target_hash: h2.hash(), + target_number: 6, + }, + Precommit { + target_hash: h3.hash(), + target_number: 7, + }, + ], + auth_data: Vec::new(), // not used + }; + + let (commit_tx, commit_rx) = mpsc::unbounded(); + + let until_imported = UntilCommitBlocksImported::new( + import_notifications, + block_status, + commit_rx.map_err(|_| panic!("should never error")), + ); + + commit_tx.unbounded_send((0, unknown_commit.clone())).unwrap(); + + let inner_chain_state = chain_state.clone(); + let work = until_imported + .into_future() + .select2(Delay::new(Instant::now() + Duration::from_millis(100))) + .then(move |res| match res { + Err(_) => panic!("neither should have had error"), + Ok(Either::A(_)) => panic!("timeout should have fired first"), + Ok(Either::B((_, until_imported))) => { + // timeout fired. push in the headers. + inner_chain_state.import_header(h1); + inner_chain_state.import_header(h2); + inner_chain_state.import_header(h3); + + until_imported + } + }); + + let mut runtime = Runtime::new().unwrap(); + assert_eq!(runtime.block_on(work).map_err(|(e, _)| e).unwrap().0, Some((0, unknown_commit))); + } + + #[test] + fn commit_message_all_known() { + let h1 = make_header(5); + let h2 = make_header(6); + let h3 = make_header(7); + + let (chain_state, import_notifications) = TestChainState::new(); + let block_status = chain_state.block_status(); + + let known_commit = CompactCommit:: { + target_hash: h1.hash(), + target_number: 5, + precommits: vec![ + Precommit { + target_hash: h2.hash(), + target_number: 6, + }, + Precommit { + target_hash: h3.hash(), + target_number: 7, + }, + ], + auth_data: Vec::new(), // not used + }; + + chain_state.import_header(h1); + chain_state.import_header(h2); + chain_state.import_header(h3); + + let (commit_tx, commit_rx) = mpsc::unbounded(); + + let until_imported = UntilCommitBlocksImported::new( + import_notifications, + block_status, + commit_rx.map_err(|_| panic!("should never error")), + ); + + commit_tx.unbounded_send((0, known_commit.clone())).unwrap(); + + let work = until_imported.into_future(); + + let mut runtime = Runtime::new().unwrap(); + assert_eq!(runtime.block_on(work).map_err(|(e, _)| e).unwrap().0, Some((0, known_commit))); + } +}