localize messages to set-index and add pending changes

This commit is contained in:
Robert Habermeier
2018-10-21 11:11:53 +02:00
parent fa62c8e9df
commit 515153fa85
4 changed files with 132 additions and 10 deletions
@@ -6,9 +6,11 @@ authors = ["Parity Technologies <admin@parity.io>"]
[dependencies]
futures = "0.1.17"
parity-codec = "2.1"
parity-codec-derive = "2.0"
sr-primitives = { path = "../sr-primitives" }
substrate-primitives = { path = "../primitives" }
substrate-client = { path = "../client" }
substrate-network = { path = "../network" }
log = "0.4"
tokio = "0.1.7"
@@ -0,0 +1,98 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Utilities for dealing with authorities, authority sets, and handoffs.
use substrate_primitives::AuthorityId;
use std::ops::Add;
/// A shared authority set.
pub(crate) struct SharedAuthoritySet<H, N> {
inner: RwLock<AuthoritySet<H, N>>,
}
impl<H, N> SharedAuthoritySet<H, N> {
/// The genesis authority set.
pub(crate) fn genesis(initial: Vec<(AuthorityId, usize)>) -> Self {
SharedAuthoritySet {
inner: RwLock::new(AuthoritySet {
current_authorities: initial,
set_id: 0,
pending_changes: Vec::new(),
})
}
}
/// Execute some work using the inner authority set.
pub(crate) fn with<F, U>(&self, f: F) -> U
where F: FnOnce(&AuthoritySet<H, N>) -> U
{
f(&*self.inner.read())
}
impl<H, N: Add + Clone> SharedAuthoritySet<H, N> {
/// Note an upcoming pending transition.
pub(crate) fn add_pending_change(&self, pending: PendingChange<H, N>) {
let idx = self.pending_changes
.binary_search_by_key(|change| change.effective_number())
.unwrap_or_else(|i| i);
self.pending_changes.insert(idx);
}
/// Get the earliest limit-block number, if any.
pub(crate) fn current_limit(&self) -> Option<&N> {
self.pending_changes.get(0).map(|change| &change.effective_number());
}
}
impl<H, N> From<AuthoritySet<H, N>> for SharedAuthoritySet<H, N> {
fn from(set: AuthoritySet<H, N>) -> Self {
SharedAuthoritySet { inner: RwLock::new(set) }
}
}
/// A set of authorities.
#[derive(Encode, Decode)]
pub(crate) struct AuthoritySet<H, N> {
current_authorities: Vec<(AuthorityId, usize)>,
set_id: u64,
pending_changes: Vec<PendingChange<H, N>>,
}
/// A pending change to the authority set.
///
/// This will be applied when the announcing block is at some depth within
/// the finalized chain.
#[derive(Encode, Decode)]
pub(crate) struct PendingChange<H, N> {
/// The new authorities and weights to apply.
pub(crate) next_authorities: Vec<(AuthorityId, usize)>,
/// How deep in the finalized chain the announcing block must be
/// before the change is applied.
pub(crate) finalization_depth: N,
/// The announcing block's height.
pub(crate) canon_height: N,
/// The announcing block's hash.
pub(crate) canon_hash: H,
}
impl<H, N: Add + Clone> PendingChange<H, N> {
/// Returns the effective number.
fn effective_number(&self) -> N {
self.canon_height + self.finalization_depth
}
}
+31 -10
View File
@@ -23,6 +23,7 @@ extern crate futures;
extern crate substrate_client as client;
extern crate sr_primitives as runtime_primitives;
extern crate substrate_primitives;
extern crate substrate_network as network;
extern crate tokio;
extern crate parity_codec as codec;
@@ -38,6 +39,9 @@ extern crate parking_lot;
#[cfg(test)]
extern crate substrate_keyring as keyring;
#[macro_use]
extern crate parity_codec_derive;
use futures::prelude::*;
use futures::stream::Fuse;
use futures::sync::mpsc;
@@ -57,7 +61,13 @@ use std::collections::{VecDeque, HashMap};
use std::sync::Arc;
use std::time::{Instant, Duration};
mod authorities;
const LAST_COMPLETED_KEY: &[u8] = b"grandpa_completed_round";
const AUTHORITY_SET_KEY: &[u8] = b"grandpa_voters";
/// round-number, round-state, set indicator
type LastCompleted<H> = (u64, RoundState<H>, u64);
/// A GRANDPA message for a substrate chain.
pub type Message<Block> = grandpa::Message<<Block as BlockT>::Hash>;
@@ -69,8 +79,7 @@ pub struct Config {
/// The expected duration for a message to be gossiped across the network.
pub gossip_duration: Duration,
/// The voters.
// TODO: make dynamic
pub voters: Vec<AuthorityId>,
pub genesis_voters: Vec<AuthorityId>,
/// The local signing key.
pub local_key: Option<Arc<ed25519::Pair>>,
}
@@ -298,15 +307,23 @@ impl<I, N: Network> Drop for ClearOnDrop<I, N> {
}
}
fn round_localized_payload<E: Encode>(round: u64, message: &E) -> Vec<u8> {
fn localized_payload<E: Encode>(round: u64, set_id: u64, message: &E) -> Vec<u8> {
let mut v = message.encode();
round.using_encoded(|s| v.extend(s));
set_id.using_encoded(|s| v.extend(s));
v
}
// converts a message stream into a stream of signed messages.
// the output stream checks signatures also.
fn checked_message_stream<Block: BlockT, S>(inner: S, round: u64, voters: Vec<AuthorityId>)
fn checked_message_stream<Block: BlockT, S>(
round: u64,
set_id: u64,
inner: S,
voters: Vec<AuthorityId>,
)
-> impl Stream<Item=SignedMessage<Block>,Error=Error> where
S: Stream<Item=Vec<u8>,Error=()>
{
@@ -326,7 +343,7 @@ fn checked_message_stream<Block: BlockT, S>(inner: S, round: u64, voters: Vec<Au
}
let as_public = ::ed25519::Public::from_raw(msg.id.0);
let encoded_raw = round_localized_payload(round, &msg.message);
let encoded_raw = localized_payload(round, set_id, &msg.message);
if ::ed25519::verify_strong(&msg.signature, &encoded_raw, as_public) {
Ok(Some(msg))
} else {
@@ -339,9 +356,10 @@ fn checked_message_stream<Block: BlockT, S>(inner: S, round: u64, voters: Vec<Au
}
fn outgoing_messages<Block: BlockT, N: Network>(
round: u64,
set_id: u64,
local_key: Option<Arc<ed25519::Pair>>,
voters: Vec<AuthorityId>,
round: u64,
network: N,
) -> (
impl Stream<Item=SignedMessage<Block>,Error=Error>,
@@ -357,7 +375,7 @@ fn outgoing_messages<Block: BlockT, N: Network>(
.map(move |msg: Message<Block>| {
// when locals exist, sign messages on import
if let Some((ref pair, local_id)) = locals {
let encoded = round_localized_payload(round, &msg);
let encoded = localized_payload(round, set_id, &msg);
let signature = pair.sign(&encoded[..]);
let signed = SignedMessage::<Block> {
message: msg,
@@ -480,17 +498,20 @@ impl<B, E, Block: BlockT, N> voter::Environment<Block::Hash> for Environment<B,
let prevote_timer = Delay::new(now + self.config.gossip_duration * 2);
let precommit_timer = Delay::new(now + self.config.gossip_duration * 4);
let set_id = unimplemented!();
// TODO: dispatch this with `mpsc::spawn`.
let incoming = checked_message_stream::<Block, _>(
self.network.messages_for(round),
round,
self.config.voters.clone(),
set_id,
self.network.messages_for(round),
self.config.genesis_voters.clone(),
);
let (out_rx, outgoing) = outgoing_messages::<Block, _>(
round,
set_id,
self.config.local_key.clone(),
self.config.voters.clone(),
round,
self.network.clone(),
);