GRANDPA: add commit messages (#1151)

* get compiling with latest version of grandpa

* generalize UntilImported to prepare for waiting for commit message targets

* extract until_imported out to own module

* logic for blocking commits until enough blocks imported

* add tests for commit message blocking logic

* pass through commit mesage round number as well

* extract communication streams to own module

* add Error implementation for ExitOrError

* introduce stream adapter for checking commit messages

* output sink for commits

* implement the unimplemented

* remove extra line

* update to latest version of grandpa api

* update finality-grandpa to 0.4.0

* Use filter_map earlier when checking incoming commits messages

Co-Authored-By: rphmeier <rphmeier@gmail.com>

* address some grumbles
This commit is contained in:
Robert Habermeier
2018-11-26 16:24:26 +01:00
committed by GitHub
parent b7e0db725d
commit 59af4de4fd
7 changed files with 1093 additions and 293 deletions
+143 -288
View File
@@ -61,6 +61,7 @@ extern crate tokio;
extern crate parking_lot;
extern crate parity_codec as codec;
extern crate substrate_finality_grandpa_primitives as fg_primitives;
extern crate rand;
#[macro_use]
extern crate log;
@@ -81,35 +82,40 @@ extern crate env_logger;
extern crate parity_codec_derive;
use futures::prelude::*;
use futures::stream::Fuse;
use futures::sync::mpsc;
use client::{Client, error::Error as ClientError, ImportNotifications, backend::Backend, CallExecutor};
use client::{
Client, error::Error as ClientError, backend::Backend, CallExecutor, BlockchainEvents
};
use client::blockchain::HeaderBackend;
use client::runtime_api::TaggedTransactionQueue;
use codec::{Encode, Decode};
use consensus_common::{BlockImport, ImportBlock, ImportResult, Authorities};
use runtime_primitives::traits::{
NumberFor, Block as BlockT, Header as HeaderT, DigestFor, ProvideRuntimeApi
NumberFor, Block as BlockT, Header as HeaderT, DigestFor, ProvideRuntimeApi, Hash as HashT,
};
use fg_primitives::GrandpaApi;
use runtime_primitives::generic::BlockId;
use substrate_primitives::{ed25519, H256, AuthorityId, Blake2Hasher};
use tokio::timer::Interval;
use tokio::timer::Delay;
use grandpa::Error as GrandpaError;
use grandpa::{voter, round::State as RoundState, Equivocation, BlockNumberOps};
use network::{Service as NetworkService, ExHashT};
use network::consensus_gossip::{ConsensusMessage};
use std::collections::{VecDeque, HashMap};
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use std::time::{Instant, Duration};
use authorities::SharedAuthoritySet;
use until_imported::{UntilCommitBlocksImported, UntilVoteTargetImported};
pub use fg_primitives::ScheduledChange;
mod authorities;
mod communication;
mod until_imported;
#[cfg(feature="service-integration")]
mod service_integration;
@@ -138,6 +144,20 @@ pub type SignedMessage<Block> = grandpa::SignedMessage<
pub type Prevote<Block> = grandpa::Prevote<<Block as BlockT>::Hash, NumberFor<Block>>;
/// A precommit message for this chain's block type.
pub type Precommit<Block> = grandpa::Precommit<<Block as BlockT>::Hash, NumberFor<Block>>;
/// A commit message for this chain's block type.
pub type Commit<Block> = grandpa::Commit<
<Block as BlockT>::Hash,
NumberFor<Block>,
ed25519::Signature,
AuthorityId
>;
/// A compact commit message for this chain's block type.
pub type CompactCommit<Block> = grandpa::CompactCommit<
<Block as BlockT>::Hash,
NumberFor<Block>,
ed25519::Signature,
AuthorityId
>;
/// Configuration for the GRANDPA service.
#[derive(Clone)]
@@ -181,7 +201,7 @@ impl From<GrandpaError> for Error {
/// handle to a gossip service or similar.
///
/// Intended to be a lightweight handle such as an `Arc`.
pub trait Network : Clone {
pub trait Network: Clone {
/// A stream of input messages for a topic.
type In: Stream<Item=Vec<u8>,Error=()>;
@@ -194,6 +214,13 @@ pub trait Network : Clone {
/// Clean up messages for a round.
fn drop_messages(&self, round: u64, set_id: u64);
/// Get a stream of commit messages for a specific set-id. This stream
/// should never logically conclude.
fn commit_messages(&self, set_id: u64) -> Self::In;
/// Send message over the commit channel.
fn send_commit(&self, set_id: u64, message: Vec<u8>);
}
/// Bridge between NetworkService, gossiping consensus messages and Grandpa
@@ -208,7 +235,6 @@ impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>, H: ExHashT
}
}
impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>, H: ExHashT> Clone for NetworkBridge<B, S, H> {
fn clone(&self) -> Self {
NetworkBridge {
@@ -218,10 +244,13 @@ impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>, H: ExHashT
}
fn message_topic<B: BlockT>(round: u64, set_id: u64) -> B::Hash {
use runtime_primitives::traits::Hash as HashT;
<<B::Header as HeaderT>::Hashing as HashT>::hash(format!("{}-{}", set_id, round).as_bytes())
}
fn commit_topic<B: BlockT>(set_id: u64) -> B::Hash {
<<B::Header as HeaderT>::Hashing as HashT>::hash(format!("{}-COMMITS", set_id).as_bytes())
}
impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>, H: ExHashT> Network for NetworkBridge<B, S, H> {
type In = mpsc::UnboundedReceiver<ConsensusMessage>;
fn messages_for(&self, round: u64, set_id: u64) -> Self::In {
@@ -240,6 +269,18 @@ impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>, H: ExHashT
let topic = message_topic::<B>(round, set_id);
self.service.consensus_gossip().write().collect_garbage(|t| t == &topic);
}
fn commit_messages(&self, set_id: u64) -> Self::In {
self.service.consensus_gossip().write().messages_for(commit_topic::<B>(set_id))
}
fn send_commit(&self, set_id: u64, message: Vec<u8>) {
let topic = commit_topic::<B>(set_id);
let gossip = self.service.consensus_gossip();
self.service.with_spec(move |_s, context|{
gossip.write().multicast(context, topic, message);
});
}
}
/// Something which can determine if a block is known.
@@ -262,269 +303,6 @@ impl<B, E, Block: BlockT<Hash=H256>, RA> BlockStatus<Block> for Arc<Client<B, E,
}
}
/// Buffering imported messages until blocks with given hashes are imported.
struct UntilImported<Block: BlockT, Status, I> {
import_notifications: Fuse<ImportNotifications<Block>>,
status_check: Status,
inner: Fuse<I>,
ready: VecDeque<SignedMessage<Block>>,
check_pending: Interval,
pending: HashMap<Block::Hash, Vec<SignedMessage<Block>>>,
}
impl<Block: BlockT, Status: BlockStatus<Block>, I: Stream> UntilImported<Block, Status, I> {
fn new(
import_notifications: ImportNotifications<Block>,
status_check: Status,
stream: I,
) -> Self {
// how often to check if pending messages that are waiting for blocks to be
// imported can be checked.
//
// the import notifications interval takes care of most of this; this is
// used in the event of missed import notifications
const CHECK_PENDING_INTERVAL: Duration = Duration::from_secs(5);
let now = Instant::now();
let check_pending = Interval::new(now + CHECK_PENDING_INTERVAL, CHECK_PENDING_INTERVAL);
UntilImported {
import_notifications: import_notifications.fuse(),
status_check,
inner: stream.fuse(),
ready: VecDeque::new(),
check_pending,
pending: HashMap::new(),
}
}
}
impl<Block: BlockT, Status: BlockStatus<Block>, I> Stream for UntilImported<Block, Status, I>
where I: Stream<Item=SignedMessage<Block>,Error=Error>
{
type Item = SignedMessage<Block>;
type Error = Error;
fn poll(&mut self) -> Poll<Option<SignedMessage<Block>>, Error> {
loop {
match self.inner.poll() {
Err(e) => return Err(e),
Ok(Async::Ready(None)) => return Ok(Async::Ready(None)),
Ok(Async::Ready(Some(signed_message))) => {
let (&target_hash, target_number) = signed_message.target();
// new message: hold it until the block is known.
if let Some(number) = self.status_check.block_number(target_hash)? {
if number != target_number {
warn!(
target: "afg",
"Authority {:?} signed GRANDPA message with \
wrong block number for hash {}",
signed_message.id,
target_hash
);
} else {
self.ready.push_back(signed_message)
}
} else {
self.pending.entry(target_hash)
.or_insert_with(Vec::new)
.push(signed_message);
}
}
Ok(Async::NotReady) => break,
}
}
loop {
match self.import_notifications.poll() {
Err(_) => return Err(Error::Network(format!("Failed to get new message"))),
Ok(Async::Ready(None)) => return Ok(Async::Ready(None)),
Ok(Async::Ready(Some(notification))) => {
// new block imported. queue up all messages tied to that hash.
if let Some(messages) = self.pending.remove(&notification.hash) {
self.ready.extend(messages);
}
}
Ok(Async::NotReady) => break,
}
}
let mut update_interval = false;
while let Async::Ready(Some(_)) = self.check_pending.poll().map_err(Error::Timer)? {
update_interval = true;
}
if update_interval {
let mut known_keys = Vec::new();
for &block_hash in self.pending.keys() {
if let Some(number) = self.status_check.block_number(block_hash)? {
known_keys.push((block_hash, number));
}
}
for (known_hash, canon_number) in known_keys {
if let Some(mut pending_messages) = self.pending.remove(&known_hash) {
// verify canonicality of pending messages.
pending_messages.retain(|msg| {
let number_correct = msg.target().1 == canon_number;
if !number_correct {
warn!(
target: "afg",
"Authority {:?} signed GRANDPA message with \
wrong block number for hash {}",
msg.id,
known_hash,
);
}
number_correct
});
self.ready.extend(pending_messages);
}
}
}
if let Some(ready) = self.ready.pop_front() {
return Ok(Async::Ready(Some(ready)))
}
if self.import_notifications.is_done() && self.inner.is_done() {
Ok(Async::Ready(None))
} else {
Ok(Async::NotReady)
}
}
}
// clears the network messages for inner round on drop.
struct ClearOnDrop<I, N: Network> {
round: u64,
set_id: u64,
inner: I,
network: N,
}
impl<I: Sink, N: Network> Sink for ClearOnDrop<I, N> {
type SinkItem = I::SinkItem;
type SinkError = I::SinkError;
fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
self.inner.start_send(item)
}
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
self.inner.poll_complete()
}
fn close(&mut self) -> Poll<(), Self::SinkError> {
self.inner.close()
}
}
impl<I, N: Network> Drop for ClearOnDrop<I, N> {
fn drop(&mut self) {
self.network.drop_messages(self.round, self.set_id);
}
}
fn localized_payload<E: Encode>(round: u64, set_id: u64, message: &E) -> Vec<u8> {
let mut v = message.encode();
round.using_encoded(|s| v.extend(s));
set_id.using_encoded(|s| v.extend(s));
v
}
// converts a message stream into a stream of signed messages.
// the output stream checks signatures also.
fn checked_message_stream<Block: BlockT, S>(
round: u64,
set_id: u64,
inner: S,
voters: Arc<HashMap<AuthorityId, u64>>,
)
-> impl Stream<Item=SignedMessage<Block>,Error=Error> where
S: Stream<Item=Vec<u8>,Error=()>
{
inner
.filter_map(|raw| {
let decoded = SignedMessage::<Block>::decode(&mut &raw[..]);
if decoded.is_none() {
debug!(target: "afg", "Skipping malformed message {:?}", raw);
}
decoded
})
.and_then(move |msg| {
// check signature.
if !voters.contains_key(&msg.id) {
debug!(target: "afg", "Skipping message from unknown voter {}", msg.id);
return Ok(None);
}
let as_public = ::ed25519::Public::from_raw(msg.id.0);
let encoded_raw = localized_payload(round, set_id, &msg.message);
if ::ed25519::verify_strong(&msg.signature, &encoded_raw, as_public) {
Ok(Some(msg))
} else {
debug!(target: "afg", "Skipping message with bad signature");
Ok(None)
}
})
.filter_map(|x| x)
.map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream")))
}
fn outgoing_messages<Block: BlockT, N: Network>(
round: u64,
set_id: u64,
local_key: Option<Arc<ed25519::Pair>>,
voters: Arc<HashMap<AuthorityId, u64>>,
network: N,
) -> (
impl Stream<Item=SignedMessage<Block>,Error=Error>,
impl Sink<SinkItem=Message<Block>,SinkError=Error>,
) {
let locals = local_key.and_then(|pair| {
let public = pair.public();
let id = AuthorityId(public.0);
if voters.contains_key(&id) {
Some((pair, id))
} else {
None
}
});
let (tx, rx) = mpsc::unbounded();
let rx = rx
.map(move |msg: Message<Block>| {
// when locals exist, sign messages on import
if let Some((ref pair, local_id)) = locals {
let encoded = localized_payload(round, set_id, &msg);
let signature = pair.sign(&encoded[..]);
let signed = SignedMessage::<Block> {
message: msg,
signature,
id: local_id,
};
// forward to network.
network.send_message(round, set_id, signed.encode());
Some(signed)
} else {
None
}
})
.filter_map(|x| x)
.map_err(move |()| Error::Network(
format!("Failed to receive on unbounded receiver for round {}", round)
));
let tx = tx.sink_map_err(move |e| Error::Network(format!("Failed to broadcast message \
to network in round {}: {:?}", round, e)));
(rx, tx)
}
/// The environment we run GRANDPA in.
struct Environment<B, E, Block: BlockT, N: Network, RA> {
inner: Arc<Client<B, E, Block, RA>>,
@@ -633,6 +411,17 @@ impl<H, N> From<grandpa::Error> for ExitOrError<H, N> {
}
}
impl<H: fmt::Debug, N: fmt::Debug> ::std::error::Error for ExitOrError<H, N> { }
impl<H, N> fmt::Display for ExitOrError<H, N> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
ExitOrError::Error(ref e) => write!(f, "{:?}", e),
ExitOrError::AuthoritiesChanged(_) => write!(f, "restarting voter on new authorities"),
}
}
}
impl<B, E, Block: BlockT<Hash=H256>, N, RA> voter::Environment<Block::Hash, NumberFor<Block>> for Environment<B, E, Block, N, RA> where
Block: 'static,
B: Backend<Block, Blake2Hasher> + 'static,
@@ -645,6 +434,8 @@ impl<B, E, Block: BlockT<Hash=H256>, N, RA> voter::Environment<Block::Hash, Numb
type Timer = Box<dyn Future<Item = (), Error = Self::Error> + Send>;
type Id = AuthorityId;
type Signature = ed25519::Signature;
// regular round message streams
type In = Box<dyn Stream<
Item = ::grandpa::SignedMessage<Block::Hash, NumberFor<Block>, Self::Signature, Self::Id>,
Error = Self::Error,
@@ -653,29 +444,26 @@ impl<B, E, Block: BlockT<Hash=H256>, N, RA> voter::Environment<Block::Hash, Numb
SinkItem = ::grandpa::Message<Block::Hash, NumberFor<Block>>,
SinkError = Self::Error,
> + Send>;
type Error = ExitOrError<Block::Hash, NumberFor<Block>>;
#[allow(unreachable_code)]
fn round_data(
&self,
round: u64
) -> voter::RoundData<Self::Timer, Self::Id, Self::In, Self::Out> {
use client::BlockchainEvents;
use tokio::timer::Delay;
) -> voter::RoundData<Self::Timer, Self::In, Self::Out> {
let now = Instant::now();
let prevote_timer = Delay::new(now + self.config.gossip_duration * 2);
let precommit_timer = Delay::new(now + self.config.gossip_duration * 4);
// TODO: dispatch this with `mpsc::spawn`.
let incoming = checked_message_stream::<Block, _>(
let incoming = ::communication::checked_message_stream::<Block, _>(
round,
self.set_id,
self.network.messages_for(round, self.set_id),
self.voters.clone(),
);
let (out_rx, outgoing) = outgoing_messages::<Block, _>(
let (out_rx, outgoing) = ::communication::outgoing_messages::<Block, _>(
round,
self.set_id,
self.config.local_key.clone(),
@@ -685,7 +473,7 @@ impl<B, E, Block: BlockT<Hash=H256>, N, RA> voter::Environment<Block::Hash, Numb
// schedule incoming messages from the network to be held until
// corresponding blocks are imported.
let incoming = UntilImported::new(
let incoming = UntilVoteTargetImported::new(
self.inner.import_notification_stream(),
self.inner.clone(),
incoming,
@@ -695,17 +483,11 @@ impl<B, E, Block: BlockT<Hash=H256>, N, RA> voter::Environment<Block::Hash, Numb
let incoming = Box::new(out_rx.select(incoming).map_err(Into::into));
// schedule network message cleanup when sink drops.
let outgoing = Box::new(ClearOnDrop {
round,
set_id: self.set_id,
network: self.network.clone(),
inner: outgoing.sink_map_err(Into::into),
});
let outgoing = Box::new(outgoing.sink_map_err(Into::into));
voter::RoundData {
prevote_timer: Box::new(prevote_timer.map_err(|e| Error::Timer(e).into())),
precommit_timer: Box::new(precommit_timer.map_err(|e| Error::Timer(e).into())),
voters: (&*self.voters).clone(),
incoming,
outgoing,
}
@@ -732,7 +514,7 @@ impl<B, E, Block: BlockT<Hash=H256>, N, RA> voter::Environment<Block::Hash, Numb
}
}
fn finalize_block(&self, hash: Block::Hash, number: NumberFor<Block>) -> Result<(), Self::Error> {
fn finalize_block(&self, hash: Block::Hash, number: NumberFor<Block>, _commit: Commit<Block>) -> Result<(), Self::Error> {
// ideally some handle to a synchronization oracle would be used
// to avoid unconditionally notifying.
if let Err(e) = self.inner.finalize_block(BlockId::Hash(hash), true) {
@@ -794,6 +576,7 @@ impl<B, E, Block: BlockT<Hash=H256>, N, RA> voter::Environment<Block::Hash, Numb
} else {
info!("Applying GRANDPA set change to new set {:?}", set_ref);
}
Err(ExitOrError::AuthoritiesChanged(NewAuthoritySet {
canon_hash,
canon_number,
@@ -805,6 +588,16 @@ impl<B, E, Block: BlockT<Hash=H256>, N, RA> voter::Environment<Block::Hash, Numb
}
}
fn round_commit_timer(&self) -> Self::Timer {
use rand::{thread_rng, Rng};
//random between 0-1 seconds.
let delay: u64 = thread_rng().gen_range(0, 1000);
Box::new(Delay::new(
Instant::now() + Duration::from_millis(delay)
).map_err(|e| Error::Timer(e).into()))
}
fn prevote_equivocation(
&self,
_round: u64,
@@ -969,6 +762,52 @@ pub fn block_import<B, E, Block: BlockT<Hash=H256>, RA, PRA>(
))
}
fn committer_communication<Block: BlockT<Hash=H256>, B, E, N, RA>(
set_id: u64,
voters: &Arc<HashMap<AuthorityId, u64>>,
client: &Arc<Client<B, E, Block, RA>>,
network: &N,
) -> (
impl Stream<
Item = (u64, ::grandpa::CompactCommit<H256, NumberFor<Block>, ed25519::Signature, AuthorityId>),
Error = ExitOrError<H256, NumberFor<Block>>,
>,
impl Sink<
SinkItem = (u64, ::grandpa::Commit<H256, NumberFor<Block>, ed25519::Signature, AuthorityId>),
SinkError = ExitOrError<H256, NumberFor<Block>>,
>,
) where
B: Backend<Block, Blake2Hasher>,
E: CallExecutor<Block, Blake2Hasher> + Send + Sync,
N: Network,
RA: Send + Sync,
NumberFor<Block>: BlockNumberOps,
{
// verification stream
let commit_in = ::communication::checked_commit_stream::<Block, _>(
set_id,
network.commit_messages(set_id),
voters.clone(),
);
// block commit messages until relevant blocks are imported.
let commit_in = UntilCommitBlocksImported::new(
client.import_notification_stream(),
client.clone(),
commit_in,
);
let commit_out = ::communication::CommitsOut::<Block, _>::new(
network.clone(),
set_id,
);
let commit_in = commit_in.map_err(Into::into);
let commit_out = commit_out.sink_map_err(Into::into);
(commit_in, commit_out)
}
/// Run a GRANDPA voter as a task. Provide configuration and a link to a
/// block import worker that has already been instantiated with `block_import`.
pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
@@ -1027,7 +866,23 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
chain_info.chain.finalized_number,
);
let voter = voter::Voter::new(env, last_round_number, last_state, last_finalized);
let committer_data = committer_communication(
env.set_id,
&env.voters,
&client,
&network,
);
let voters = (*env.voters).clone();
let voter = voter::Voter::new(
env,
voters,
committer_data,
last_round_number,
last_state,
last_finalized,
);
let client = client.clone();
let config = config.clone();
let network = network.clone();