diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock
index ef32ae3c9b..f155a7cc89 100644
--- a/substrate/Cargo.lock
+++ b/substrate/Cargo.lock
@@ -670,7 +670,7 @@ dependencies = [
[[package]]
name = "finality-grandpa"
-version = "0.3.0"
+version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -2196,6 +2196,33 @@ dependencies = [
"winapi 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
]
+[[package]]
+name = "rand"
+version = "0.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
+ "fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
+ "libc 0.2.43 (registry+https://github.com/rust-lang/crates.io-index)",
+ "rand_chacha 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "rand_core 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "rand_hc 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "rand_isaac 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "rand_pcg 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "rand_xorshift 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "rustc_version 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
+ "winapi 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "rand_chacha"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "rand_core 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "rustc_version 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
[[package]]
name = "rand_core"
version = "0.2.2"
@@ -2209,6 +2236,39 @@ name = "rand_core"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
+[[package]]
+name = "rand_hc"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "rand_core 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "rand_isaac"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "rand_core 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "rand_pcg"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "rand_core 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "rustc_version 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "rand_xorshift"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "rand_core 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
[[package]]
name = "rayon"
version = "0.8.2"
@@ -3238,12 +3298,13 @@ name = "substrate-finality-grandpa"
version = "0.1.0"
dependencies = [
"env_logger 0.5.13 (registry+https://github.com/rust-lang/crates.io-index)",
- "finality-grandpa 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "finality-grandpa 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"parity-codec 2.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
"parity-codec-derive 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
+ "rand 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
"sr-primitives 0.1.0",
"substrate-client 0.1.0",
"substrate-consensus-common 0.1.0",
@@ -4440,7 +4501,7 @@ dependencies = [
"checksum failure_derive 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "64c2d913fe8ed3b6c6518eedf4538255b989945c14c2a7d5cbff62a5e2120596"
"checksum fake-simd 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed"
"checksum fdlimit 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b1ee15a7050e5580b3712877157068ea713b245b080ff302ae2ca973cfcd9baa"
-"checksum finality-grandpa 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "be6d2735e8f570474c7925a60ebe04ec0bdd9eea7cc4fddab78a0ecfdefec20e"
+"checksum finality-grandpa 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "4fc88b8ddddcf3f998b8196d93c3ce31427c5b241cfe6c5a342e2a3f5d13ecbb"
"checksum fixed-hash 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a557e80084b05c32b455963ff565a9de6f2866da023d6671705c6aff6f65e01c"
"checksum fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3"
"checksum foreign-types 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1"
@@ -4582,8 +4643,14 @@ dependencies = [
"checksum rand 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)" = "15a732abf9d20f0ad8eeb6f909bf6868722d9a06e1e50802b6a70351f40b4eb1"
"checksum rand 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "8356f47b32624fef5b3301c1be97e5944ecdd595409cc5da11d05f211db6cfbd"
"checksum rand 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e464cd887e869cddcae8792a4ee31d23c7edd516700695608f5b98c67ee0131c"
+"checksum rand 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "de3f08319b5395bd19b70e73c4c465329495db02dafeb8ca711a20f1c2bd058c"
+"checksum rand_chacha 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "771b009e3a508cb67e8823dda454aaa5368c7bc1c16829fb77d3e980440dd34a"
"checksum rand_core 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1961a422c4d189dfb50ffa9320bf1f2a9bd54ecb92792fb9477f99a1045f3372"
"checksum rand_core 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "0905b6b7079ec73b314d4c748701f6931eb79fd97c668caa3f1899b22b32c6db"
+"checksum rand_hc 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7b40677c7be09ae76218dc623efbf7b18e34bced3f38883af07bb75630a21bc4"
+"checksum rand_isaac 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2d6ecfe9ebf36acd47a49d150990b047a5f7db0a7236ee2414b7ff5cc1097c7b"
+"checksum rand_pcg 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "086bd09a33c7044e56bb44d5bdde5a60e7f119a9e95b0775f545de759a32fe05"
+"checksum rand_xorshift 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "effa3fcaa47e18db002bdde6060944b6d2f9cfd8db471c30e873448ad9187be3"
"checksum rayon 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)" = "b614fe08b6665cb9a231d07ac1364b0ef3cb3698f1239ee0c4c3a88a524f54c8"
"checksum rayon 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "373814f27745b2686b350dd261bfd24576a6fb0e2c5919b3a2b6005f820b0473"
"checksum rayon-core 1.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b055d1e92aba6877574d8fe604a63c8b5df60f60e5982bf7ccbb1338ea527356"
diff --git a/substrate/core/client/src/lib.rs b/substrate/core/client/src/lib.rs
index 58fced9687..a73671ca47 100644
--- a/substrate/core/client/src/lib.rs
+++ b/substrate/core/client/src/lib.rs
@@ -106,7 +106,7 @@ pub use client::{
new_with_backend,
new_in_mem,
BlockBody, BlockStatus, ImportNotifications, FinalityNotifications, BlockchainEvents,
- Client, ClientInfo, ChainHead,
+ BlockImportNotification, Client, ClientInfo, ChainHead,
};
#[cfg(feature = "std")]
pub use notifications::{StorageEventStream, StorageChangeSet};
diff --git a/substrate/core/finality-grandpa/Cargo.toml b/substrate/core/finality-grandpa/Cargo.toml
index 9fe2776744..6bf16e77d2 100644
--- a/substrate/core/finality-grandpa/Cargo.toml
+++ b/substrate/core/finality-grandpa/Cargo.toml
@@ -17,9 +17,10 @@ log = "0.4"
parking_lot = "0.4"
tokio = "0.1.7"
substrate-finality-grandpa-primitives = { path = "primitives" }
+rand = "0.6"
[dependencies.finality-grandpa]
-version = "0.3.0"
+version = "0.4.0"
features = ["derive-codec"]
[dev-dependencies]
diff --git a/substrate/core/finality-grandpa/src/communication.rs b/substrate/core/finality-grandpa/src/communication.rs
new file mode 100644
index 0000000000..8e06a0e7ca
--- /dev/null
+++ b/substrate/core/finality-grandpa/src/communication.rs
@@ -0,0 +1,281 @@
+// Copyright 2017-2018 Parity Technologies (UK) Ltd.
+// This file is part of Substrate.
+
+// Substrate is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Substrate is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Substrate. If not, see .
+
+//! Incoming message streams that verify signatures, and outgoing message streams
+//! that sign or re-shape.
+
+use futures::prelude::*;
+use futures::sync::mpsc;
+use codec::{Encode, Decode};
+use substrate_primitives::{ed25519, AuthorityId};
+use runtime_primitives::traits::Block as BlockT;
+use {Error, Network, Message, SignedMessage, Commit, CompactCommit};
+
+use std::collections::HashMap;
+use std::sync::Arc;
+
+fn localized_payload(round: u64, set_id: u64, message: &E) -> Vec {
+ (message, round, set_id).encode()
+}
+
+// check a message.
+fn check_message_sig(
+ message: &Message,
+ id: &AuthorityId,
+ signature: &ed25519::Signature,
+ round: u64,
+ set_id: u64,
+) -> Result<(), ()> {
+ let as_public = ::ed25519::Public::from_raw(id.0);
+ let encoded_raw = localized_payload(round, set_id, message);
+ if ::ed25519::verify_strong(signature, &encoded_raw, as_public) {
+ Ok(())
+ } else {
+ debug!(target: "afg", "Bad signature on message from {:?}", id);
+ Err(())
+ }
+}
+
+/// converts a message stream into a stream of signed messages.
+/// the output stream checks signatures also.
+pub(crate) fn checked_message_stream(
+ round: u64,
+ set_id: u64,
+ inner: S,
+ voters: Arc>,
+)
+ -> impl Stream- ,Error=Error> where
+ S: Stream
- ,Error=()>
+{
+ inner
+ .filter_map(|raw| {
+ let decoded = SignedMessage::::decode(&mut &raw[..]);
+ if decoded.is_none() {
+ debug!(target: "afg", "Skipping malformed message {:?}", raw);
+ }
+ decoded
+ })
+ .and_then(move |msg| {
+ // check signature.
+ if !voters.contains_key(&msg.id) {
+ debug!(target: "afg", "Skipping message from unknown voter {}", msg.id);
+ return Ok(None);
+ }
+
+ // we ignore messages where the signature doesn't check out.
+ let res = check_message_sig::(
+ &msg.message,
+ &msg.id,
+ &msg.signature,
+ round,
+ set_id
+ );
+ Ok(res.map(move |()| msg).ok())
+ })
+ .filter_map(|x| x)
+ .map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream")))
+}
+
+struct OutgoingMessages {
+ round: u64,
+ set_id: u64,
+ locals: Option<(Arc, AuthorityId)>,
+ sender: mpsc::UnboundedSender>,
+ network: N,
+}
+
+impl Sink for OutgoingMessages {
+ type SinkItem = Message;
+ type SinkError = Error;
+
+ fn start_send(&mut self, msg: Message) -> StartSend, Error> {
+ // when locals exist, sign messages on import
+ if let Some((ref pair, local_id)) = self.locals {
+ let encoded = localized_payload(self.round, self.set_id, &msg);
+ let signature = pair.sign(&encoded[..]);
+ let signed = SignedMessage:: {
+ message: msg,
+ signature,
+ id: local_id,
+ };
+
+ // forward to network and to inner sender.
+ self.network.send_message(self.round, self.set_id, signed.encode());
+ let _ = self.sender.unbounded_send(signed);
+ }
+
+ Ok(AsyncSink::Ready)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), Error> { Ok(Async::Ready(())) }
+
+ fn close(&mut self) -> Poll<(), Error> {
+ // ignore errors since we allow this inner sender to be closed already.
+ self.sender.close().or_else(|_| Ok(Async::Ready(())))
+ }
+}
+
+impl Drop for OutgoingMessages {
+ fn drop(&mut self) {
+ self.network.drop_messages(self.round, self.set_id);
+ }
+}
+
+/// A sink for outgoing messages. This signs the messages with the key,
+/// if we are an authority. A stream for the signed messages is also returned.
+///
+/// A future can push unsigned messages into the sink. They will be automatically
+/// broadcast to the network. The returned stream should be combined with other input.
+pub(crate) fn outgoing_messages(
+ round: u64,
+ set_id: u64,
+ local_key: Option>,
+ voters: Arc>,
+ network: N,
+) -> (
+ impl Stream
- ,Error=Error>,
+ impl Sink,SinkError=Error>,
+) {
+ let locals = local_key.and_then(|pair| {
+ let public = pair.public();
+ let id = AuthorityId(public.0);
+ if voters.contains_key(&id) {
+ Some((pair, id))
+ } else {
+ None
+ }
+ });
+
+ let (tx, rx) = mpsc::unbounded();
+ let outgoing = OutgoingMessages:: {
+ round,
+ set_id,
+ network,
+ locals,
+ sender: tx,
+ };
+
+ let rx = rx.map_err(move |()| Error::Network(
+ format!("Failed to receive on unbounded receiver for round {}", round)
+ ));
+
+ (rx, outgoing)
+}
+
+fn check_compact_commit(
+ msg: CompactCommit,
+ voters: &HashMap,
+ round: u64,
+ set_id: u64,
+) -> Option> {
+ use grandpa::Message as GrandpaMessage;
+ if msg.precommits.len() != msg.auth_data.len() || msg.precommits.is_empty() {
+ debug!(target: "afg", "Skipping malformed compact commit");
+ return None;
+ }
+
+ // check signatures on all contained precommits.
+ for (precommit, &(ref sig, ref id)) in msg.precommits.iter().zip(&msg.auth_data) {
+ if !voters.contains_key(id) {
+ debug!(target: "afg", "Skipping commit containing unknown voter {}", id);
+ return None;
+ }
+
+ let res = check_message_sig::(
+ &GrandpaMessage::Precommit(precommit.clone()),
+ id,
+ sig,
+ round,
+ set_id,
+ );
+
+ if let Err(()) = res {
+ debug!(target: "afg", "Skipping commit containing bad message");
+ return None;
+ }
+ }
+
+ Some(msg)
+}
+
+/// A stream for incoming commit messages. This checks all the signatures on the
+/// messages.
+pub(crate) fn checked_commit_stream(
+ set_id: u64,
+ inner: S,
+ voters: Arc>,
+)
+ -> impl Stream
- ),Error=Error> where
+ S: Stream
- ,Error=()>
+{
+ inner
+ .filter_map(|raw| {
+ // this could be optimized by decoding piecewise.
+ let decoded = <(u64, CompactCommit)>::decode(&mut &raw[..]);
+ if decoded.is_none() {
+ trace!(target: "afg", "Skipping malformed commit message {:?}", raw);
+ }
+ decoded
+ })
+ .filter_map(move |(round, msg)| {
+ check_compact_commit::(msg, &*voters, round, set_id).map(move |c| (round, c))
+ })
+ .map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream")))
+}
+
+/// An output sink for commit messages.
+pub(crate) struct CommitsOut {
+ network: N,
+ set_id: u64,
+ _marker: ::std::marker::PhantomData,
+}
+
+impl CommitsOut {
+ /// Create a new commit output stream.
+ pub(crate) fn new(network: N, set_id: u64) -> Self {
+ CommitsOut {
+ network,
+ set_id,
+ _marker: Default::default(),
+ }
+ }
+}
+
+impl Sink for CommitsOut {
+ type SinkItem = (u64, Commit);
+ type SinkError = Error;
+
+ fn start_send(&mut self, input: (u64, Commit)) -> StartSend {
+ let (round, commit) = input;
+ let (precommits, auth_data) = commit.precommits.into_iter()
+ .map(|signed| (signed.precommit, (signed.signature, signed.id)))
+ .unzip();
+
+ let compact_commit = CompactCommit:: {
+ target_hash: commit.target_hash,
+ target_number: commit.target_number,
+ precommits,
+ auth_data
+ };
+
+ self.network.send_commit(self.set_id, Encode::encode(&(round, compact_commit)));
+
+ Ok(AsyncSink::Ready)
+ }
+
+ fn close(&mut self) -> Poll<(), Error> { Ok(Async::Ready(())) }
+ fn poll_complete(&mut self) -> Poll<(), Error> { Ok(Async::Ready(())) }
+}
diff --git a/substrate/core/finality-grandpa/src/lib.rs b/substrate/core/finality-grandpa/src/lib.rs
index 956b87df0e..9b48f5aa51 100644
--- a/substrate/core/finality-grandpa/src/lib.rs
+++ b/substrate/core/finality-grandpa/src/lib.rs
@@ -61,6 +61,7 @@ extern crate tokio;
extern crate parking_lot;
extern crate parity_codec as codec;
extern crate substrate_finality_grandpa_primitives as fg_primitives;
+extern crate rand;
#[macro_use]
extern crate log;
@@ -81,35 +82,40 @@ extern crate env_logger;
extern crate parity_codec_derive;
use futures::prelude::*;
-use futures::stream::Fuse;
use futures::sync::mpsc;
-use client::{Client, error::Error as ClientError, ImportNotifications, backend::Backend, CallExecutor};
+use client::{
+ Client, error::Error as ClientError, backend::Backend, CallExecutor, BlockchainEvents
+};
use client::blockchain::HeaderBackend;
use client::runtime_api::TaggedTransactionQueue;
use codec::{Encode, Decode};
use consensus_common::{BlockImport, ImportBlock, ImportResult, Authorities};
use runtime_primitives::traits::{
- NumberFor, Block as BlockT, Header as HeaderT, DigestFor, ProvideRuntimeApi
+ NumberFor, Block as BlockT, Header as HeaderT, DigestFor, ProvideRuntimeApi, Hash as HashT,
};
use fg_primitives::GrandpaApi;
use runtime_primitives::generic::BlockId;
use substrate_primitives::{ed25519, H256, AuthorityId, Blake2Hasher};
-use tokio::timer::Interval;
+use tokio::timer::Delay;
use grandpa::Error as GrandpaError;
use grandpa::{voter, round::State as RoundState, Equivocation, BlockNumberOps};
use network::{Service as NetworkService, ExHashT};
use network::consensus_gossip::{ConsensusMessage};
-use std::collections::{VecDeque, HashMap};
+use std::collections::HashMap;
+use std::fmt;
use std::sync::Arc;
use std::time::{Instant, Duration};
use authorities::SharedAuthoritySet;
+use until_imported::{UntilCommitBlocksImported, UntilVoteTargetImported};
pub use fg_primitives::ScheduledChange;
mod authorities;
+mod communication;
+mod until_imported;
#[cfg(feature="service-integration")]
mod service_integration;
@@ -138,6 +144,20 @@ pub type SignedMessage = grandpa::SignedMessage<
pub type Prevote = grandpa::Prevote<::Hash, NumberFor>;
/// A precommit message for this chain's block type.
pub type Precommit = grandpa::Precommit<::Hash, NumberFor>;
+/// A commit message for this chain's block type.
+pub type Commit = grandpa::Commit<
+ ::Hash,
+ NumberFor,
+ ed25519::Signature,
+ AuthorityId
+>;
+/// A compact commit message for this chain's block type.
+pub type CompactCommit = grandpa::CompactCommit<
+ ::Hash,
+ NumberFor,
+ ed25519::Signature,
+ AuthorityId
+>;
/// Configuration for the GRANDPA service.
#[derive(Clone)]
@@ -181,7 +201,7 @@ impl From for Error {
/// handle to a gossip service or similar.
///
/// Intended to be a lightweight handle such as an `Arc`.
-pub trait Network : Clone {
+pub trait Network: Clone {
/// A stream of input messages for a topic.
type In: Stream
- ,Error=()>;
@@ -194,6 +214,13 @@ pub trait Network : Clone {
/// Clean up messages for a round.
fn drop_messages(&self, round: u64, set_id: u64);
+
+ /// Get a stream of commit messages for a specific set-id. This stream
+ /// should never logically conclude.
+ fn commit_messages(&self, set_id: u64) -> Self::In;
+
+ /// Send message over the commit channel.
+ fn send_commit(&self, set_id: u64, message: Vec);
}
/// Bridge between NetworkService, gossiping consensus messages and Grandpa
@@ -208,7 +235,6 @@ impl, H: ExHashT
}
}
-
impl, H: ExHashT> Clone for NetworkBridge {
fn clone(&self) -> Self {
NetworkBridge {
@@ -218,10 +244,13 @@ impl, H: ExHashT
}
fn message_topic(round: u64, set_id: u64) -> B::Hash {
- use runtime_primitives::traits::Hash as HashT;
<::Hashing as HashT>::hash(format!("{}-{}", set_id, round).as_bytes())
}
+fn commit_topic(set_id: u64) -> B::Hash {
+ <::Hashing as HashT>::hash(format!("{}-COMMITS", set_id).as_bytes())
+}
+
impl, H: ExHashT> Network for NetworkBridge {
type In = mpsc::UnboundedReceiver;
fn messages_for(&self, round: u64, set_id: u64) -> Self::In {
@@ -240,6 +269,18 @@ impl, H: ExHashT
let topic = message_topic::(round, set_id);
self.service.consensus_gossip().write().collect_garbage(|t| t == &topic);
}
+
+ fn commit_messages(&self, set_id: u64) -> Self::In {
+ self.service.consensus_gossip().write().messages_for(commit_topic::(set_id))
+ }
+
+ fn send_commit(&self, set_id: u64, message: Vec) {
+ let topic = commit_topic::(set_id);
+ let gossip = self.service.consensus_gossip();
+ self.service.with_spec(move |_s, context|{
+ gossip.write().multicast(context, topic, message);
+ });
+ }
}
/// Something which can determine if a block is known.
@@ -262,269 +303,6 @@ impl, RA> BlockStatus for Arc {
- import_notifications: Fuse>,
- status_check: Status,
- inner: Fuse,
- ready: VecDeque>,
- check_pending: Interval,
- pending: HashMap>>,
-}
-
-impl, I: Stream> UntilImported {
- fn new(
- import_notifications: ImportNotifications,
- status_check: Status,
- stream: I,
- ) -> Self {
- // how often to check if pending messages that are waiting for blocks to be
- // imported can be checked.
- //
- // the import notifications interval takes care of most of this; this is
- // used in the event of missed import notifications
- const CHECK_PENDING_INTERVAL: Duration = Duration::from_secs(5);
- let now = Instant::now();
-
- let check_pending = Interval::new(now + CHECK_PENDING_INTERVAL, CHECK_PENDING_INTERVAL);
- UntilImported {
- import_notifications: import_notifications.fuse(),
- status_check,
- inner: stream.fuse(),
- ready: VecDeque::new(),
- check_pending,
- pending: HashMap::new(),
- }
- }
-}
-
-impl, I> Stream for UntilImported
- where I: Stream
- ,Error=Error>
-{
- type Item = SignedMessage;
- type Error = Error;
-
- fn poll(&mut self) -> Poll