grandpa: Voter persistence and upgrade to finality-grandpa v0.7 (#2139)

* core: grandpa: migrate to grandpa 0.7

* core: grandpa: store current round votes and load them on startup

* core: grandpa: resend old persisted votes for the current round

* core: grandpa: store base and votes for last completed round

* core: grandpa: fix latest grandpa 0.7 changes

* core: grandpa: update to grandpa 0.7.1

* core: grandpa: persist votes for last two completed rounds

* core: grandpa: simplify VoterSetState usage

* core: grandpa: use Environment::update_voter_set_state

* core: grandpa: fix aux_schema test

* core: grandpa: add docs

* core: grandpa: add note about environment assumption

* core: grandpa: don't update voter set state on ignored votes

* core: grandpa: add test for v1 -> v2 aux_schema migration

* core: grandpa: add test for voter vote persistence

* core: grandpa: use grandpa 0.7.1 from crates.io

* core: grandpa: use try_init in test

* core: grandpa: add comment about block_import in test

* core: grandpa: avoid cloning HasVoted

* core: grandpa: add missing docs

* core: grandpa: cleanup up can_propose/prevote/precommit
This commit is contained in:
André Silva
2019-04-08 12:50:34 +01:00
committed by Robert Habermeier
parent ed3ae4ac39
commit a1e15ae55a
11 changed files with 1020 additions and 190 deletions
+1 -1
View File
@@ -22,7 +22,7 @@ network = { package = "substrate-network", path = "../network" }
service = { package = "substrate-service", path = "../service", optional = true }
srml-finality-tracker = { path = "../../srml/finality-tracker" }
fg_primitives = { package = "substrate-finality-grandpa-primitives", path = "primitives" }
grandpa = { package = "finality-grandpa", version = "0.6.0", features = ["derive-codec"] }
grandpa = { package = "finality-grandpa", version = "0.7.1", features = ["derive-codec"] }
[dev-dependencies]
network = { package = "substrate-network", path = "../network", features = ["test-helpers"] }
@@ -19,7 +19,7 @@
use fork_tree::ForkTree;
use parking_lot::RwLock;
use substrate_primitives::ed25519;
use grandpa::VoterSet;
use grandpa::voter_set::VoterSet;
use parity_codec::{Encode, Decode};
use log::{debug, info};
use substrate_telemetry::{telemetry, CONSENSUS_INFO};
+302 -68
View File
@@ -23,10 +23,12 @@ use client::backend::AuxStore;
use client::error::{Result as ClientResult, Error as ClientError, ErrorKind as ClientErrorKind};
use fork_tree::ForkTree;
use grandpa::round::State as RoundState;
use runtime_primitives::traits::{Block as BlockT, NumberFor};
use log::{info, warn};
use crate::authorities::{AuthoritySet, SharedAuthoritySet, PendingChange, DelayKind};
use crate::consensus_changes::{SharedConsensusChanges, ConsensusChanges};
use crate::environment::{CompletedRound, CompletedRounds, HasVoted, SharedVoterSetState, VoterSetState};
use crate::NewAuthoritySet;
use substrate_primitives::ed25519::Public as AuthorityId;
@@ -36,28 +38,18 @@ const SET_STATE_KEY: &[u8] = b"grandpa_completed_round";
const AUTHORITY_SET_KEY: &[u8] = b"grandpa_voters";
const CONSENSUS_CHANGES_KEY: &[u8] = b"grandpa_consensus_changes";
const CURRENT_VERSION: u32 = 1;
const CURRENT_VERSION: u32 = 2;
/// The voter set state.
#[derive(Debug, Clone, Encode, Decode)]
#[cfg_attr(test, derive(PartialEq))]
pub enum VoterSetState<H, N> {
pub enum V1VoterSetState<H, N> {
/// The voter set state, currently paused.
Paused(u64, RoundState<H, N>),
/// The voter set state, currently live.
Live(u64, RoundState<H, N>),
}
impl<H: Clone, N: Clone> VoterSetState<H, N> {
/// Yields the current state.
pub(crate) fn round(&self) -> (u64, RoundState<H, N>) {
match *self {
VoterSetState::Paused(n, ref s) => (n, s.clone()),
VoterSetState::Live(n, ref s) => (n, s.clone()),
}
}
}
type V0VoterSetState<H, N> = (u64, RoundState<H, N>);
#[derive(Debug, Clone, Encode, Decode, PartialEq)]
@@ -124,72 +116,205 @@ fn load_decode<B: AuxStore, T: Decode>(backend: &B, key: &[u8]) -> ClientResult<
}
/// Persistent data kept between runs.
pub(crate) struct PersistentData<H, N> {
pub(crate) authority_set: SharedAuthoritySet<H, N>,
pub(crate) consensus_changes: SharedConsensusChanges<H, N>,
pub(crate) set_state: VoterSetState<H, N>,
pub(crate) struct PersistentData<Block: BlockT> {
pub(crate) authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
pub(crate) consensus_changes: SharedConsensusChanges<Block::Hash, NumberFor<Block>>,
pub(crate) set_state: SharedVoterSetState<Block>,
}
fn migrate_from_version0<Block: BlockT, B, G>(
backend: &B,
genesis_round: &G,
) -> ClientResult<Option<(
AuthoritySet<Block::Hash, NumberFor<Block>>,
VoterSetState<Block>,
)>> where B: AuxStore,
G: Fn() -> RoundState<Block::Hash, NumberFor<Block>>,
{
CURRENT_VERSION.using_encoded(|s|
backend.insert_aux(&[(VERSION_KEY, s)], &[])
)?;
if let Some(old_set) = load_decode::<_, V0AuthoritySet<Block::Hash, NumberFor<Block>>>(
backend,
AUTHORITY_SET_KEY,
)? {
let new_set: AuthoritySet<Block::Hash, NumberFor<Block>> = old_set.into();
backend.insert_aux(&[(AUTHORITY_SET_KEY, new_set.encode().as_slice())], &[])?;
let (last_round_number, last_round_state) = match load_decode::<_, V0VoterSetState<Block::Hash, NumberFor<Block>>>(
backend,
SET_STATE_KEY,
)? {
Some((number, state)) => (number, state),
None => (0, genesis_round()),
};
let base = last_round_state.prevote_ghost
.expect("state is for completed round; completed rounds must have a prevote ghost; qed.");
let set_state = VoterSetState::Live {
completed_rounds: CompletedRounds::new(CompletedRound {
number: last_round_number,
state: last_round_state,
votes: Vec::new(),
base,
}),
current_round: HasVoted::No,
};
backend.insert_aux(&[(SET_STATE_KEY, set_state.encode().as_slice())], &[])?;
return Ok(Some((new_set, set_state)));
}
Ok(None)
}
fn migrate_from_version1<Block: BlockT, B, G>(
backend: &B,
genesis_round: &G,
) -> ClientResult<Option<(
AuthoritySet<Block::Hash, NumberFor<Block>>,
VoterSetState<Block>,
)>> where B: AuxStore,
G: Fn() -> RoundState<Block::Hash, NumberFor<Block>>,
{
CURRENT_VERSION.using_encoded(|s|
backend.insert_aux(&[(VERSION_KEY, s)], &[])
)?;
if let Some(set) = load_decode::<_, AuthoritySet<Block::Hash, NumberFor<Block>>>(
backend,
AUTHORITY_SET_KEY,
)? {
let set_state = match load_decode::<_, V1VoterSetState<Block::Hash, NumberFor<Block>>>(
backend,
SET_STATE_KEY,
)? {
Some(V1VoterSetState::Paused(last_round_number, set_state)) => {
let base = set_state.prevote_ghost
.expect("state is for completed round; completed rounds must have a prevote ghost; qed.");
VoterSetState::Paused {
completed_rounds: CompletedRounds::new(CompletedRound {
number: last_round_number,
state: set_state,
votes: Vec::new(),
base,
}),
}
},
Some(V1VoterSetState::Live(last_round_number, set_state)) => {
let base = set_state.prevote_ghost
.expect("state is for completed round; completed rounds must have a prevote ghost; qed.");
VoterSetState::Live {
completed_rounds: CompletedRounds::new(CompletedRound {
number: last_round_number,
state: set_state,
votes: Vec::new(),
base,
}),
current_round: HasVoted::No,
}
},
None => {
let set_state = genesis_round();
let base = set_state.prevote_ghost
.expect("state is for completed round; completed rounds must have a prevote ghost; qed.");
VoterSetState::Live {
completed_rounds: CompletedRounds::new(CompletedRound {
number: 0,
state: set_state,
votes: Vec::new(),
base,
}),
current_round: HasVoted::No,
}
},
};
backend.insert_aux(&[(SET_STATE_KEY, set_state.encode().as_slice())], &[])?;
return Ok(Some((set, set_state)));
}
Ok(None)
}
/// Load or initialize persistent data from backend.
pub(crate) fn load_persistent<B, H, N, G>(
pub(crate) fn load_persistent<Block: BlockT, B, G>(
backend: &B,
genesis_hash: H,
genesis_number: N,
genesis_hash: Block::Hash,
genesis_number: NumberFor<Block>,
genesis_authorities: G,
)
-> ClientResult<PersistentData<H, N>>
-> ClientResult<PersistentData<Block>>
where
B: AuxStore,
H: Debug + Decode + Encode + Clone + PartialEq,
N: Debug + Decode + Encode + Clone + Ord,
G: FnOnce() -> ClientResult<Vec<(AuthorityId, u64)>>
G: FnOnce() -> ClientResult<Vec<(AuthorityId, u64)>>,
{
let version: Option<u32> = load_decode(backend, VERSION_KEY)?;
let consensus_changes = load_decode(backend, CONSENSUS_CHANGES_KEY)?
.unwrap_or_else(ConsensusChanges::<H, N>::empty);
.unwrap_or_else(ConsensusChanges::<Block::Hash, NumberFor<Block>>::empty);
let make_genesis_round = move || RoundState::genesis((genesis_hash, genesis_number));
match version {
None => {
CURRENT_VERSION.using_encoded(|s|
backend.insert_aux(&[(VERSION_KEY, s)], &[])
)?;
if let Some(old_set) = load_decode::<_, V0AuthoritySet<H, N>>(backend, AUTHORITY_SET_KEY)? {
let new_set: AuthoritySet<H, N> = old_set.into();
backend.insert_aux(&[(AUTHORITY_SET_KEY, new_set.encode().as_slice())], &[])?;
let set_state = match load_decode::<_, V0VoterSetState<H, N>>(backend, SET_STATE_KEY)? {
Some((number, state)) => {
let set_state = VoterSetState::Live(number, state);
backend.insert_aux(&[(SET_STATE_KEY, set_state.encode().as_slice())], &[])?;
set_state
},
None => VoterSetState::Live(0, make_genesis_round()),
};
if let Some((new_set, set_state)) = migrate_from_version0::<Block, _, _>(backend, &make_genesis_round)? {
return Ok(PersistentData {
authority_set: new_set.into(),
consensus_changes: Arc::new(consensus_changes.into()),
set_state,
set_state: set_state.into(),
});
}
}
},
Some(1) => {
if let Some(set) = load_decode::<_, AuthoritySet<H, N>>(backend, AUTHORITY_SET_KEY)? {
let set_state = match load_decode::<_, VoterSetState<H, N>>(backend, SET_STATE_KEY)? {
if let Some((new_set, set_state)) = migrate_from_version1::<Block, _, _>(backend, &make_genesis_round)? {
return Ok(PersistentData {
authority_set: new_set.into(),
consensus_changes: Arc::new(consensus_changes.into()),
set_state: set_state.into(),
});
}
},
Some(2) => {
if let Some(set) = load_decode::<_, AuthoritySet<Block::Hash, NumberFor<Block>>>(
backend,
AUTHORITY_SET_KEY,
)? {
let set_state = match load_decode::<_, VoterSetState<Block>>(
backend,
SET_STATE_KEY,
)? {
Some(state) => state,
None => VoterSetState::Live(0, make_genesis_round()),
None => {
let state = make_genesis_round();
let base = state.prevote_ghost
.expect("state is for completed round; completed rounds must have a prevote ghost; qed.");
VoterSetState::Live {
completed_rounds: CompletedRounds::new(CompletedRound {
number: 0,
votes: Vec::new(),
base,
state,
}),
current_round: HasVoted::No,
}
}
};
return Ok(PersistentData {
authority_set: set.into(),
consensus_changes: Arc::new(consensus_changes.into()),
set_state,
set_state: set_state.into(),
});
}
}
},
Some(other) => return Err(ClientErrorKind::Backend(
format!("Unsupported GRANDPA DB version: {:?}", other)
).into()),
@@ -200,7 +325,19 @@ pub(crate) fn load_persistent<B, H, N, G>(
from genesis on what appears to be first startup.");
let genesis_set = AuthoritySet::genesis(genesis_authorities()?);
let genesis_state = VoterSetState::Live(0, make_genesis_round());
let state = make_genesis_round();
let base = state.prevote_ghost
.expect("state is for completed round; completed rounds must have a prevote ghost; qed.");
let genesis_state = VoterSetState::Live {
completed_rounds: CompletedRounds::new(CompletedRound {
number: 0,
votes: Vec::new(),
state,
base,
}),
current_round: HasVoted::No,
};
backend.insert_aux(
&[
(AUTHORITY_SET_KEY, genesis_set.encode().as_slice()),
@@ -211,19 +348,17 @@ pub(crate) fn load_persistent<B, H, N, G>(
Ok(PersistentData {
authority_set: genesis_set.into(),
set_state: genesis_state,
set_state: genesis_state.into(),
consensus_changes: Arc::new(consensus_changes.into()),
})
}
/// Update the authority set on disk after a change.
pub(crate) fn update_authority_set<H, N, F, R>(
set: &AuthoritySet<H, N>,
new_set: Option<&NewAuthoritySet<H, N>>,
pub(crate) fn update_authority_set<Block: BlockT, F, R>(
set: &AuthoritySet<Block::Hash, NumberFor<Block>>,
new_set: Option<&NewAuthoritySet<Block::Hash, NumberFor<Block>>>,
write_aux: F
) -> R where
H: Encode + Clone,
N: Encode + Clone,
F: FnOnce(&[(&'static [u8], &[u8])]) -> R,
{
// write new authority set state to disk.
@@ -237,7 +372,15 @@ pub(crate) fn update_authority_set<H, N, F, R>(
new_set.canon_hash.clone(),
new_set.canon_number.clone(),
));
let set_state = VoterSetState::Live(0, round_state);
let set_state = VoterSetState::<Block>::Live {
completed_rounds: CompletedRounds::new(CompletedRound {
number: 0,
state: round_state,
votes: Vec::new(),
base: (new_set.canon_hash, new_set.canon_number),
}),
current_round: HasVoted::No,
};
let encoded = set_state.encode();
write_aux(&[
@@ -250,10 +393,10 @@ pub(crate) fn update_authority_set<H, N, F, R>(
}
/// Write voter set state.
pub(crate) fn write_voter_set_state<B, H, N>(backend: &B, state: &VoterSetState<H, N>)
-> ClientResult<()>
where B: AuxStore, H: Encode, N: Encode
{
pub(crate) fn write_voter_set_state<Block: BlockT, B: AuxStore>(
backend: &B,
state: &VoterSetState<Block>,
) -> ClientResult<()> {
backend.insert_aux(
&[(SET_STATE_KEY, state.encode().as_slice())],
&[]
@@ -286,14 +429,14 @@ mod test {
use super::*;
#[test]
fn load_decode_migrates_data_format() {
fn load_decode_from_v0_migrates_data_format() {
let client = test_client::new();
let authorities = vec![(AuthorityId::default(), 100)];
let set_id = 3;
let round_number = 42;
let round_number: u64 = 42;
let round_state = RoundState::<H256, u64> {
prevote_ghost: None,
prevote_ghost: Some((H256::random(), 32)),
finalized: None,
estimate: None,
completable: false,
@@ -323,7 +466,7 @@ mod test {
);
// should perform the migration
load_persistent(
load_persistent::<test_client::runtime::Block, _, _>(
&client,
H256::random(),
0,
@@ -332,10 +475,10 @@ mod test {
assert_eq!(
load_decode::<_, u32>(&client, VERSION_KEY).unwrap(),
Some(1),
Some(2),
);
let PersistentData { authority_set, set_state, .. } = load_persistent(
let PersistentData { authority_set, set_state, .. } = load_persistent::<test_client::runtime::Block, _, _>(
&client,
H256::random(),
0,
@@ -353,8 +496,99 @@ mod test {
);
assert_eq!(
set_state,
VoterSetState::Live(round_number, round_state),
&*set_state.read(),
&VoterSetState::Live {
completed_rounds: CompletedRounds::new(CompletedRound {
number: round_number,
state: round_state.clone(),
base: round_state.prevote_ghost.unwrap(),
votes: vec![],
}),
current_round: HasVoted::No,
},
);
}
#[test]
fn load_decode_from_v1_migrates_data_format() {
let client = test_client::new();
let authorities = vec![(AuthorityId::default(), 100)];
let set_id = 3;
let round_number: u64 = 42;
let round_state = RoundState::<H256, u64> {
prevote_ghost: Some((H256::random(), 32)),
finalized: None,
estimate: None,
completable: false,
};
{
let authority_set = AuthoritySet::<H256, u64> {
current_authorities: authorities.clone(),
pending_standard_changes: ForkTree::new(),
pending_forced_changes: Vec::new(),
set_id,
};
let voter_set_state = V1VoterSetState::Live(round_number, round_state.clone());
client.insert_aux(
&[
(AUTHORITY_SET_KEY, authority_set.encode().as_slice()),
(SET_STATE_KEY, voter_set_state.encode().as_slice()),
(VERSION_KEY, 1u32.encode().as_slice()),
],
&[],
).unwrap();
}
assert_eq!(
load_decode::<_, u32>(&client, VERSION_KEY).unwrap(),
Some(1),
);
// should perform the migration
load_persistent::<test_client::runtime::Block, _, _>(
&client,
H256::random(),
0,
|| unreachable!(),
).unwrap();
assert_eq!(
load_decode::<_, u32>(&client, VERSION_KEY).unwrap(),
Some(2),
);
let PersistentData { authority_set, set_state, .. } = load_persistent::<test_client::runtime::Block, _, _>(
&client,
H256::random(),
0,
|| unreachable!(),
).unwrap();
assert_eq!(
*authority_set.inner().read(),
AuthoritySet {
current_authorities: authorities,
pending_standard_changes: ForkTree::new(),
pending_forced_changes: Vec::new(),
set_id,
},
);
assert_eq!(
&*set_state.read(),
&VoterSetState::Live {
completed_rounds: CompletedRounds::new(CompletedRound {
number: round_number,
state: round_state.clone(),
base: round_state.prevote_ghost.unwrap(),
votes: vec![],
}),
current_round: HasVoted::No,
},
);
}
}
@@ -29,8 +29,8 @@
use std::sync::Arc;
use grandpa::VoterSet;
use grandpa::Message::{Prevote, Precommit};
use grandpa::voter_set::VoterSet;
use grandpa::Message::{Prevote, Precommit, PrimaryPropose};
use futures::prelude::*;
use futures::sync::{oneshot, mpsc};
use log::{debug, trace};
@@ -43,6 +43,7 @@ use network::{consensus_gossip as network_gossip, Service as NetworkService,};
use network_gossip::ConsensusMessage;
use crate::{Error, Message, SignedMessage, Commit, CompactCommit};
use crate::environment::HasVoted;
use gossip::{
GossipMessage, FullCommitMessage, VoteOrPrecommitMessage, GossipValidator
};
@@ -184,7 +185,7 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
set_id: SetId,
voters: Arc<VoterSet<AuthorityId>>,
local_key: Option<Arc<ed25519::Pair>>,
has_voted: HasVoted,
has_voted: HasVoted<B>,
) -> (
impl Stream<Item=SignedMessage<B>,Error=Error>,
impl Sink<SinkItem=Message<B>,SinkError=Error>,
@@ -220,6 +221,13 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
}
match &msg.message.message {
PrimaryPropose(propose) => {
telemetry!(CONSENSUS_INFO; "afg.received_propose";
"voter" => ?format!("{}", msg.message.id),
"target_number" => ?propose.target_number,
"target_hash" => ?propose.target_hash,
);
},
Prevote(prevote) => {
telemetry!(CONSENSUS_INFO; "afg.received_prevote";
"voter" => ?format!("{}", msg.message.id),
@@ -363,55 +371,20 @@ pub(crate) fn check_message_sig<Block: BlockT>(
}
}
/// Whether we've voted already during a prior run of the program.
#[derive(Decode, Encode)]
pub(crate) enum HasVoted {
/// Has not voted already in this round.
#[codec(index = "0")]
No,
/// Has cast a proposal.
#[codec(index = "1")]
Proposed,
/// Has cast a prevote.
#[codec(index = "2")]
Prevoted,
/// Has cast a precommit (implies prevote.)
#[codec(index = "3")]
Precommitted,
}
impl HasVoted {
#[allow(unused)]
fn can_propose(&self) -> bool {
match *self {
HasVoted::No => true,
HasVoted::Proposed | HasVoted::Prevoted | HasVoted::Precommitted => false,
}
}
fn can_prevote(&self) -> bool {
match *self {
HasVoted::No | HasVoted::Proposed => true,
HasVoted::Prevoted | HasVoted::Precommitted => false,
}
}
fn can_precommit(&self) -> bool {
match *self {
HasVoted::No | HasVoted::Proposed | HasVoted::Prevoted => true,
HasVoted::Precommitted => false,
}
}
}
/// A sink for outgoing messages to the network.
/// A sink for outgoing messages to the network. Any messages that are sent will
/// be replaced, as appropriate, according to the given `HasVoted`.
/// NOTE: The votes are stored unsigned, which means that the signatures need to
/// be "stable", i.e. we should end up with the exact same signed message if we
/// use the same raw message and key to sign. This is currently true for
/// `ed25519` and `BLS` signatures (which we might use in the future), care must
/// be taken when switching to different key types.
struct OutgoingMessages<Block: BlockT, N: Network<Block>> {
round: u64,
set_id: u64,
locals: Option<(Arc<ed25519::Pair>, AuthorityId)>,
sender: mpsc::UnboundedSender<SignedMessage<Block>>,
network: N,
has_voted: HasVoted,
has_voted: HasVoted<Block>,
}
impl<Block: BlockT, N: Network<Block>> Sink for OutgoingMessages<Block, N>
@@ -419,15 +392,25 @@ impl<Block: BlockT, N: Network<Block>> Sink for OutgoingMessages<Block, N>
type SinkItem = Message<Block>;
type SinkError = Error;
fn start_send(&mut self, msg: Message<Block>) -> StartSend<Message<Block>, Error> {
// only sign if we haven't voted in this round already.
let should_sign = match msg {
grandpa::Message::Prevote(_) => self.has_voted.can_prevote(),
grandpa::Message::Precommit(_) => self.has_voted.can_precommit(),
};
fn start_send(&mut self, mut msg: Message<Block>) -> StartSend<Message<Block>, Error> {
// if we've voted on this round previously under the same key, send that vote instead
match &mut msg {
grandpa::Message::PrimaryPropose(ref mut vote) =>
if let Some(propose) = self.has_voted.propose() {
*vote = propose.clone();
},
grandpa::Message::Prevote(ref mut vote) =>
if let Some(prevote) = self.has_voted.prevote() {
*vote = prevote.clone();
},
grandpa::Message::Precommit(ref mut vote) =>
if let Some(precommit) = self.has_voted.precommit() {
*vote = precommit.clone();
},
}
// when locals exist, sign messages on import
if let (true, &Some((ref pair, ref local_id))) = (should_sign, &self.locals) {
if let Some((ref pair, ref local_id)) = self.locals {
let encoded = localized_payload(self.round, self.set_id, &msg);
let signature = pair.sign(&encoded[..]);
@@ -14,11 +14,13 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use std::collections::VecDeque;
use std::iter::FromIterator;
use std::sync::Arc;
use std::time::{Duration, Instant};
use log::{debug, warn, info};
use parity_codec::Encode;
use parity_codec::{Decode, Encode};
use futures::prelude::*;
use tokio::timer::Delay;
use parking_lot::RwLock;
@@ -27,7 +29,8 @@ use client::{
backend::Backend, BlockchainEvents, CallExecutor, Client, error::Error as ClientError
};
use grandpa::{
BlockNumberOps, Equivocation, Error as GrandpaError, round::State as RoundState, voter, VoterSet,
BlockNumberOps, Equivocation, Error as GrandpaError, round::State as RoundState,
voter, voter_set::VoterSet,
};
use runtime_primitives::generic::BlockId;
use runtime_primitives::traits::{
@@ -37,8 +40,8 @@ use substrate_primitives::{Blake2Hasher, ed25519, H256, Pair};
use substrate_telemetry::{telemetry, CONSENSUS_INFO};
use crate::{
Commit, Config, Error, Network, Precommit, Prevote,
CommandOrError, NewAuthoritySet, VoterCommand,
CommandOrError, Commit, Config, Error, Network, Precommit, Prevote,
PrimaryPropose, SignedMessage, NewAuthoritySet, VoterCommand,
};
use crate::authorities::SharedAuthoritySet;
@@ -49,27 +52,210 @@ use crate::until_imported::UntilVoteTargetImported;
use ed25519::Public as AuthorityId;
/// Data about a completed round.
pub(crate) type CompletedRound<H, N> = (u64, RoundState<H, N>);
/// A read-only view of the last completed round.
pub(crate) struct LastCompletedRound<H, N> {
inner: RwLock<CompletedRound<H, N>>,
#[derive(Debug, Clone, Decode, Encode, PartialEq)]
pub struct CompletedRound<Block: BlockT> {
/// The round number.
pub number: u64,
/// The round state (prevote ghost, estimate, finalized, etc.)
pub state: RoundState<Block::Hash, NumberFor<Block>>,
/// The target block base used for voting in the round.
pub base: (Block::Hash, NumberFor<Block>),
/// All the votes observed in the round.
pub votes: Vec<SignedMessage<Block>>,
}
impl<H: Clone, N: Clone> LastCompletedRound<H, N> {
/// Create a new tracker based on some starting last-completed round.
pub(crate) fn new(round: CompletedRound<H, N>) -> Self {
LastCompletedRound { inner: RwLock::new(round) }
// Data about last completed rounds. Stores NUM_LAST_COMPLETED_ROUNDS and always
// contains data about at least one round (genesis).
#[derive(Debug, Clone, PartialEq)]
pub struct CompletedRounds<Block: BlockT> {
inner: VecDeque<CompletedRound<Block>>,
}
// NOTE: the current strategy for persisting completed rounds is very naive
// (update everything) and we also rely on cloning to do atomic updates,
// therefore this value should be kept small for now.
const NUM_LAST_COMPLETED_ROUNDS: usize = 2;
impl<Block: BlockT> Encode for CompletedRounds<Block> {
fn encode(&self) -> Vec<u8> {
Vec::from_iter(&self.inner).encode()
}
}
impl<Block: BlockT> Decode for CompletedRounds<Block> {
fn decode<I: parity_codec::Input>(value: &mut I) -> Option<Self> {
Vec::<CompletedRound<Block>>::decode(value)
.map(|completed_rounds| CompletedRounds {
inner: completed_rounds.into(),
})
}
}
impl<Block: BlockT> CompletedRounds<Block> {
/// Create a new completed rounds tracker with NUM_LAST_COMPLETED_ROUNDS capacity.
pub fn new(genesis: CompletedRound<Block>) -> CompletedRounds<Block> {
let mut inner = VecDeque::with_capacity(NUM_LAST_COMPLETED_ROUNDS);
inner.push_back(genesis);
CompletedRounds { inner }
}
/// Read the last completed round.
pub(crate) fn read(&self) -> CompletedRound<H, N> {
self.inner.read().clone()
/// Returns the last (latest) completed round.
pub fn last(&self) -> &CompletedRound<Block> {
self.inner.back()
.expect("inner is never empty; always contains at least genesis; qed")
}
/// Push a new completed round, returns false if the given round is older
/// than the last completed round.
pub fn push(&mut self, completed_round: CompletedRound<Block>) -> bool {
if self.last().number >= completed_round.number {
return false;
}
if self.inner.len() == NUM_LAST_COMPLETED_ROUNDS {
self.inner.pop_front();
}
self.inner.push_back(completed_round);
true
}
}
/// The state of the current voter set, whether it is currently active or not
/// and information related to the previously completed rounds. Current round
/// voting status is used when restarting the voter, i.e. it will re-use the
/// previous votes for a given round if appropriate (same round and same local
/// key).
#[derive(Debug, Decode, Encode, PartialEq)]
pub enum VoterSetState<Block: BlockT> {
/// The voter is live, i.e. participating in rounds.
Live {
/// The previously completed rounds.
completed_rounds: CompletedRounds<Block>,
/// Vote status for the current round.
current_round: HasVoted<Block>,
},
/// The voter is paused, i.e. not casting or importing any votes.
Paused {
/// The previously completed rounds.
completed_rounds: CompletedRounds<Block>,
},
}
impl<Block: BlockT> VoterSetState<Block> {
/// Returns the last completed rounds.
pub(crate) fn completed_rounds(&self) -> CompletedRounds<Block> {
match self {
VoterSetState::Live { completed_rounds, .. } =>
completed_rounds.clone(),
VoterSetState::Paused { completed_rounds } =>
completed_rounds.clone(),
}
}
}
/// Whether we've voted already during a prior run of the program.
#[derive(Debug, Decode, Encode, PartialEq)]
pub enum HasVoted<Block: BlockT> {
/// Has not voted already in this round.
No,
/// Has voted in this round.
Yes(AuthorityId, Vote<Block>),
}
/// The votes cast by this voter already during a prior run of the program.
#[derive(Debug, Clone, Decode, Encode, PartialEq)]
pub enum Vote<Block: BlockT> {
/// Has cast a proposal.
Propose(PrimaryPropose<Block>),
/// Has cast a prevote.
Prevote(Option<PrimaryPropose<Block>>, Prevote<Block>),
/// Has cast a precommit (implies prevote.)
Precommit(Option<PrimaryPropose<Block>>, Prevote<Block>, Precommit<Block>),
}
impl<Block: BlockT> HasVoted<Block> {
/// Returns the proposal we should vote with (if any.)
pub fn propose(&self) -> Option<&PrimaryPropose<Block>> {
match self {
HasVoted::Yes(_, Vote::Propose(propose)) =>
Some(propose),
HasVoted::Yes(_, Vote::Prevote(propose, _)) | HasVoted::Yes(_, Vote::Precommit(propose, _, _)) =>
propose.as_ref(),
_ => None,
}
}
/// Returns the prevote we should vote with (if any.)
pub fn prevote(&self) -> Option<&Prevote<Block>> {
match self {
HasVoted::Yes(_, Vote::Prevote(_, prevote)) | HasVoted::Yes(_, Vote::Precommit(_, prevote, _)) =>
Some(prevote),
_ => None,
}
}
/// Returns the precommit we should vote with (if any.)
pub fn precommit(&self) -> Option<&Precommit<Block>> {
match self {
HasVoted::Yes(_, Vote::Precommit(_, _, precommit)) =>
Some(precommit),
_ => None,
}
}
/// Returns true if the voter can still propose, false otherwise.
pub fn can_propose(&self) -> bool {
self.propose().is_none()
}
/// Returns true if the voter can still prevote, false otherwise.
pub fn can_prevote(&self) -> bool {
self.prevote().is_none()
}
/// Returns true if the voter can still precommit, false otherwise.
pub fn can_precommit(&self) -> bool {
self.precommit().is_none()
}
}
/// A voter set state meant to be shared safely across multiple owners.
#[derive(Clone)]
pub struct SharedVoterSetState<Block: BlockT> {
inner: Arc<RwLock<VoterSetState<Block>>>,
}
impl<Block: BlockT> From<VoterSetState<Block>> for SharedVoterSetState<Block> {
fn from(set_state: VoterSetState<Block>) -> Self {
SharedVoterSetState::new(set_state)
}
}
impl<Block: BlockT> SharedVoterSetState<Block> {
/// Create a new shared voter set tracker with the given state.
pub(crate) fn new(state: VoterSetState<Block>) -> Self {
SharedVoterSetState { inner: Arc::new(RwLock::new(state)) }
}
/// Read the inner voter set state.
pub(crate) fn read(&self) -> parking_lot::RwLockReadGuard<VoterSetState<Block>> {
self.inner.read()
}
/// Return vote status information for the current round.
pub(crate) fn has_voted(&self) -> HasVoted<Block> {
match &*self.inner.read() {
VoterSetState::Live { current_round: HasVoted::Yes(id, vote), .. } =>
HasVoted::Yes(id.clone(), vote.clone()),
_ => HasVoted::No,
}
}
// NOTE: not exposed outside of this module intentionally.
fn with<F, R>(&self, f: F) -> R
where F: FnOnce(&mut CompletedRound<H, N>) -> R
where F: FnOnce(&mut VoterSetState<Block>) -> R
{
f(&mut *self.inner.write())
}
@@ -84,7 +270,23 @@ pub(crate) struct Environment<B, E, Block: BlockT, N: Network<Block>, RA> {
pub(crate) consensus_changes: SharedConsensusChanges<Block::Hash, NumberFor<Block>>,
pub(crate) network: crate::communication::NetworkBridge<Block, N>,
pub(crate) set_id: u64,
pub(crate) last_completed: LastCompletedRound<Block::Hash, NumberFor<Block>>,
pub(crate) voter_set_state: SharedVoterSetState<Block>,
}
impl<B, E, Block: BlockT, N: Network<Block>, RA> Environment<B, E, Block, N, RA> {
/// Updates the voter set state using the given closure. The write lock is
/// held during evaluation of the closure and the environment's voter set
/// state is set to its result if successful.
pub(crate) fn update_voter_set_state<F>(&self, f: F) -> Result<(), Error> where
F: FnOnce(&VoterSetState<Block>) -> Result<Option<VoterSetState<Block>>, Error>
{
self.voter_set_state.with(|voter_set_state| {
if let Some(set_state) = f(&voter_set_state)? {
*voter_set_state = set_state;
}
Ok(())
})
}
}
impl<Block: BlockT<Hash=H256>, B, E, N, RA> grandpa::Chain<Block::Hash, NumberFor<Block>> for Environment<B, E, Block, N, RA> where
@@ -226,7 +428,7 @@ impl<B, E, Block: BlockT<Hash=H256>, N, RA> voter::Environment<Block::Hash, Numb
fn round_data(
&self,
round: u64
) -> voter::RoundData<Self::Timer, Self::In, Self::Out> {
) -> voter::RoundData<Self::Id, Self::Timer, Self::In, Self::Out> {
let now = Instant::now();
let prevote_timer = Delay::new(now + self.config.gossip_duration * 2);
let precommit_timer = Delay::new(now + self.config.gossip_duration * 4);
@@ -239,7 +441,7 @@ impl<B, E, Block: BlockT<Hash=H256>, N, RA> voter::Environment<Block::Hash, Numb
crate::communication::SetId(self.set_id),
self.voters.clone(),
local_key.cloned(),
crate::communication::HasVoted::No,
self.voter_set_state.has_voted(),
);
// schedule incoming messages from the network to be held until
@@ -254,6 +456,7 @@ impl<B, E, Block: BlockT<Hash=H256>, N, RA> voter::Environment<Block::Hash, Numb
let outgoing = Box::new(outgoing.sink_map_err(Into::into));
voter::RoundData {
voter_id: self.config.local_key.as_ref().map(|pair| pair.public().clone()),
prevote_timer: Box::new(prevote_timer.map_err(|e| Error::Timer(e).into())),
precommit_timer: Box::new(precommit_timer.map_err(|e| Error::Timer(e).into())),
incoming,
@@ -261,7 +464,131 @@ impl<B, E, Block: BlockT<Hash=H256>, N, RA> voter::Environment<Block::Hash, Numb
}
}
fn completed(&self, round: u64, state: RoundState<Block::Hash, NumberFor<Block>>) -> Result<(), Self::Error> {
fn proposed(&self, _round: u64, propose: PrimaryPropose<Block>) -> Result<(), Self::Error> {
let local_id = self.config.local_key.as_ref()
.map(|pair| pair.public().into())
.filter(|id| self.voters.contains_key(&id));
let local_id = match local_id {
Some(id) => id,
None => return Ok(()),
};
self.update_voter_set_state(|voter_set_state| {
let completed_rounds = match voter_set_state {
VoterSetState::Live { completed_rounds, current_round: HasVoted::No } => completed_rounds,
VoterSetState::Live { current_round, .. } if !current_round.can_propose() => {
// we've already proposed in this round (in a previous run),
// ignore the given vote and don't update the voter set
// state
return Ok(None);
},
_ => {
let msg = "Voter proposing after prevote/precommit or while paused.";
return Err(Error::Safety(msg.to_string()));
},
};
let set_state = VoterSetState::<Block>::Live {
completed_rounds: completed_rounds.clone(),
current_round: HasVoted::Yes(local_id, Vote::Propose(propose)),
};
crate::aux_schema::write_voter_set_state(&**self.inner.backend(), &set_state)?;
Ok(Some(set_state))
})?;
Ok(())
}
fn prevoted(&self, _round: u64, prevote: Prevote<Block>) -> Result<(), Self::Error> {
let local_id = self.config.local_key.as_ref()
.map(|pair| pair.public().into())
.filter(|id| self.voters.contains_key(&id));
let local_id = match local_id {
Some(id) => id,
None => return Ok(()),
};
self.update_voter_set_state(|voter_set_state| {
let (completed_rounds, propose) = match voter_set_state {
VoterSetState::Live { completed_rounds, current_round: HasVoted::No } =>
(completed_rounds, None),
VoterSetState::Live { completed_rounds, current_round: HasVoted::Yes(_, Vote::Propose(propose)) } =>
(completed_rounds, Some(propose)),
VoterSetState::Live { current_round, .. } if !current_round.can_prevote() => {
// we've already prevoted in this round (in a previous run),
// ignore the given vote and don't update the voter set
// state
return Ok(None);
},
_ => {
let msg = "Voter prevoting after precommit or while paused.";
return Err(Error::Safety(msg.to_string()));
},
};
let set_state = VoterSetState::<Block>::Live {
completed_rounds: completed_rounds.clone(),
current_round: HasVoted::Yes(local_id, Vote::Prevote(propose.cloned(), prevote)),
};
crate::aux_schema::write_voter_set_state(&**self.inner.backend(), &set_state)?;
Ok(Some(set_state))
})?;
Ok(())
}
fn precommitted(&self, _round: u64, precommit: Precommit<Block>) -> Result<(), Self::Error> {
let local_id = self.config.local_key.as_ref()
.map(|pair| pair.public().into())
.filter(|id| self.voters.contains_key(&id));
let local_id = match local_id {
Some(id) => id,
None => return Ok(()),
};
self.update_voter_set_state(|voter_set_state| {
let (completed_rounds, propose, prevote) = match voter_set_state {
VoterSetState::Live { completed_rounds, current_round: HasVoted::Yes(_, Vote::Prevote(propose, prevote)) } =>
(completed_rounds, propose, prevote),
VoterSetState::Live { current_round, .. } if !current_round.can_precommit() => {
// we've already precommitted in this round (in a previous run),
// ignore the given vote and don't update the voter set
// state
return Ok(None);
},
_ => {
let msg = "Voter precommitting while paused.";
return Err(Error::Safety(msg.to_string()));
}
};
let set_state = VoterSetState::<Block>::Live {
completed_rounds: completed_rounds.clone(),
current_round: HasVoted::Yes(local_id, Vote::Precommit(propose.clone(), prevote.clone(), precommit)),
};
crate::aux_schema::write_voter_set_state(&**self.inner.backend(), &set_state)?;
Ok(Some(set_state))
})?;
Ok(())
}
fn completed(
&self,
round: u64,
state: RoundState<Block::Hash, NumberFor<Block>>,
base: (Block::Hash, NumberFor<Block>),
votes: Vec<SignedMessage<Block>>,
) -> Result<(), Self::Error> {
debug!(
target: "afg", "Voter {} completed round {} in set {}. Estimate = {:?}, Finalized in round = {:?}",
self.config.name(),
@@ -271,13 +598,31 @@ impl<B, E, Block: BlockT<Hash=H256>, N, RA> voter::Environment<Block::Hash, Numb
state.finalized.as_ref().map(|e| e.1),
);
self.last_completed.with(|last_completed| {
let set_state = crate::aux_schema::VoterSetState::Live(round, state.clone());
self.update_voter_set_state(|voter_set_state| {
let mut completed_rounds = voter_set_state.completed_rounds();
// NOTE: the Environment assumes that rounds are *always* completed in-order.
if !completed_rounds.push(CompletedRound {
number: round,
state: state.clone(),
base,
votes,
}) {
let msg = "Voter completed round that is older than the last completed round.";
return Err(Error::Safety(msg.to_string()));
};
let set_state = VoterSetState::<Block>::Live {
completed_rounds,
current_round: HasVoted::No,
};
crate::aux_schema::write_voter_set_state(&**self.inner.backend(), &set_state)?;
*last_completed = (round, state); // after writing to DB successfully.
Ok(())
})
Ok(Some(set_state))
})?;
Ok(())
}
fn finalize_block(&self, hash: Block::Hash, number: NumberFor<Block>, round: u64, commit: Commit<Block>) -> Result<(), Self::Error> {
@@ -498,7 +843,7 @@ pub(crate) fn finalize_block<B, Block: BlockT<Hash=H256>, E, RA>(
};
if status.changed {
let write_result = crate::aux_schema::update_authority_set(
let write_result = crate::aux_schema::update_authority_set::<Block, _, _>(
&authority_set,
new_authorities.as_ref(),
|insert| client.apply_aux(import_op, insert, &[]),
@@ -29,7 +29,7 @@
//! The caller should track the `set_id`. The most straightforward way is to fetch finality
//! proofs ONLY for blocks on the tip of the chain and track the latest known `set_id`.
use grandpa::VoterSet;
use grandpa::voter_set::VoterSet;
use client::{
blockchain::Backend as BlockchainBackend,
@@ -362,7 +362,7 @@ impl<B, E, Block: BlockT<Hash=H256>, RA, PRA> GrandpaBlockImport<B, E, Block, RA
AppliedChanges::None => None,
};
crate::aux_schema::update_authority_set(
crate::aux_schema::update_authority_set::<Block, _, _>(
authorities,
authorities_change,
|insert| block.auxiliary.extend(
@@ -21,7 +21,7 @@ use client::backend::Backend;
use client::blockchain::HeaderBackend;
use client::error::{Error as ClientError, ErrorKind as ClientErrorKind};
use parity_codec::{Encode, Decode};
use grandpa::VoterSet;
use grandpa::voter_set::VoterSet;
use grandpa::{Error as GrandpaError};
use runtime_primitives::generic::BlockId;
use runtime_primitives::traits::{NumberFor, Block as BlockT, Header as HeaderT};
@@ -129,7 +129,7 @@ impl<Block: BlockT<Hash=H256>> GrandpaJustification<Block> {
voters,
&ancestry_chain,
) {
Ok(Some(_)) => {},
Ok(ref result) if result.ghost().is_some() => {},
_ => {
let msg = "invalid commit in grandpa justification".to_string();
return Err(ClientErrorKind::BadJustification(msg).into());
+107 -33
View File
@@ -73,7 +73,7 @@ use substrate_telemetry::{telemetry, CONSENSUS_INFO, CONSENSUS_DEBUG, CONSENSUS_
use srml_finality_tracker;
use grandpa::Error as GrandpaError;
use grandpa::{voter, round::State as RoundState, BlockNumberOps, VoterSet};
use grandpa::{voter, round::State as RoundState, BlockNumberOps, voter_set::VoterSet};
use std::fmt;
use std::sync::Arc;
@@ -98,8 +98,8 @@ pub use service_integration::{LinkHalfForService, BlockImportForService};
pub use communication::Network;
pub use finality_proof::{prove_finality, check_finality_proof};
use aux_schema::{PersistentData, VoterSetState};
use environment::Environment;
use aux_schema::PersistentData;
use environment::{CompletedRound, CompletedRounds, Environment, HasVoted, SharedVoterSetState, VoterSetState};
use import::GrandpaBlockImport;
use until_imported::UntilCommitBlocksImported;
use communication::NetworkBridge;
@@ -119,6 +119,8 @@ pub type SignedMessage<Block> = grandpa::SignedMessage<
AuthorityId,
>;
/// A primary propose message for this chain's block type.
pub type PrimaryPropose<Block> = grandpa::PrimaryPropose<<Block as BlockT>::Hash, NumberFor<Block>>;
/// A prevote message for this chain's block type.
pub type Prevote<Block> = grandpa::Prevote<<Block as BlockT>::Hash, NumberFor<Block>>;
/// A precommit message for this chain's block type.
@@ -281,7 +283,7 @@ impl<H, N> fmt::Display for CommandOrError<H, N> {
pub struct LinkHalf<B, E, Block: BlockT<Hash=H256>, RA> {
client: Arc<Client<B, E, Block, RA>>,
persistent_data: PersistentData<Block::Hash, NumberFor<Block>>,
persistent_data: PersistentData<Block>,
voter_commands_rx: mpsc::UnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
}
@@ -335,6 +337,41 @@ pub fn block_import<B, E, Block: BlockT<Hash=H256>, RA, PRA>(
))
}
fn global_communication<Block: BlockT<Hash=H256>, I, O>(
commits_in: I,
commits_out: O,
) -> (
impl Stream<
Item = voter::CommunicationIn<H256, NumberFor<Block>, AuthoritySignature, AuthorityId>,
Error = CommandOrError<H256, NumberFor<Block>>,
>,
impl Sink<
SinkItem = voter::CommunicationOut<H256, NumberFor<Block>, AuthoritySignature, AuthorityId>,
SinkError = CommandOrError<H256, NumberFor<Block>>,
>,
) where
I: Stream<
Item = (u64, ::grandpa::CompactCommit<H256, NumberFor<Block>, AuthoritySignature, AuthorityId>),
Error = CommandOrError<H256, NumberFor<Block>>,
>,
O: Sink<
SinkItem = (u64, ::grandpa::Commit<H256, NumberFor<Block>, AuthoritySignature, AuthorityId>),
SinkError = CommandOrError<H256, NumberFor<Block>>,
>,
{
let global_in = commits_in.map(|(round, commit)| {
voter::CommunicationIn::Commit(round, commit, voter::Callback::Blank)
});
// NOTE: eventually this will also handle catch-up requests
let global_out = commits_out.with(|global| match global {
voter::CommunicationOut::Commit(round, commit) => Ok((round, commit)),
_ => unimplemented!(),
});
(global_in, global_out)
}
fn committer_communication<Block: BlockT<Hash=H256>, B, E, N, RA>(
local_key: Option<&Arc<ed25519::Pair>>,
set_id: u64,
@@ -453,19 +490,44 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
set_id: authority_set.set_id(),
authority_set: authority_set.clone(),
consensus_changes: consensus_changes.clone(),
last_completed: environment::LastCompletedRound::new(set_state.round()),
voter_set_state: set_state.clone(),
});
let initial_state = (initial_environment, set_state, voter_commands_rx.into_future());
initial_environment.update_voter_set_state(|voter_set_state| {
match voter_set_state {
VoterSetState::Live { current_round: HasVoted::Yes(id, _), completed_rounds } => {
let local_id = config.local_key.clone().map(|pair| pair.public());
let has_voted = match local_id {
Some(local_id) => if *id == local_id {
// keep the previous votes
return Ok(None);
} else {
HasVoted::No
},
_ => HasVoted::No,
};
// NOTE: only updated on disk when the voter first
// proposes/prevotes/precommits or completes a round.
Ok(Some(VoterSetState::Live {
current_round: has_voted,
completed_rounds: completed_rounds.clone(),
}))
},
_ => Ok(None),
}
}).expect("operation inside closure cannot fail; qed");
let initial_state = (initial_environment, voter_commands_rx.into_future());
let voter_work = future::loop_fn(initial_state, move |params| {
let (env, set_state, voter_commands_rx) = params;
let (env, voter_commands_rx) = params;
debug!(target: "afg", "{}: Starting new voter with set ID {}", config.name(), env.set_id);
telemetry!(CONSENSUS_DEBUG; "afg.starting_new_voter";
"name" => ?config.name(), "set_id" => ?env.set_id
);
let mut maybe_voter = match set_state.clone() {
VoterSetState::Live(last_round_number, last_round_state) => {
let mut maybe_voter = match &*env.voter_set_state.read() {
VoterSetState::Live { completed_rounds, .. } => {
let chain_info = match client.info() {
Ok(i) => i,
Err(e) => return future::Either::B(future::err(Error::Client(e))),
@@ -476,7 +538,7 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
chain_info.chain.finalized_number,
);
let committer_data = committer_communication(
let (commit_in, commit_out) = committer_communication(
config.local_key.as_ref(),
env.set_id,
&env.voters,
@@ -484,18 +546,25 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
&network,
);
let global_comms = global_communication::<Block, _, _>(
commit_in,
commit_out,
);
let voters = (*env.voters).clone();
let last_completed_round = completed_rounds.last();
Some(voter::Voter::new(
env.clone(),
voters,
committer_data,
last_round_number,
last_round_state,
global_comms,
last_completed_round.number,
last_completed_round.state.clone(),
last_finalized,
))
}
VoterSetState::Paused(_, _) => None,
},
VoterSetState::Paused { .. } => None,
};
// needs to be combined with another future otherwise it can deadlock.
@@ -526,6 +595,20 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
// start the new authority set using the block where the
// set changed (not where the signal happened!) as the base.
let genesis_state = RoundState::genesis((new.canon_hash, new.canon_number));
let set_state = VoterSetState::Live {
// always start at round 0 when changing sets.
completed_rounds: CompletedRounds::new(CompletedRound {
number: 0,
state: genesis_state,
base: (new.canon_hash, new.canon_number),
votes: Vec::new(),
}),
current_round: HasVoted::No,
};
let set_state: SharedVoterSetState<_> = set_state.into();
let env = Arc::new(Environment {
inner: client,
config,
@@ -534,32 +617,23 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
network,
authority_set,
consensus_changes,
last_completed: environment::LastCompletedRound::new(
(0, genesis_state.clone())
),
voter_set_state: set_state,
});
let set_state = VoterSetState::Live(
0, // always start at round 0 when changing sets.
genesis_state,
);
Ok(FutureLoop::Continue((env, set_state, voter_commands_rx)))
Ok(FutureLoop::Continue((env, voter_commands_rx)))
}
VoterCommand::Pause(reason) => {
info!(target: "afg", "Pausing old validator set: {}", reason);
// not racing because old voter is shut down.
let (last_round_number, last_round_state) = env.last_completed.read();
let set_state = VoterSetState::Paused(
last_round_number,
last_round_state,
);
env.update_voter_set_state(|voter_set_state| {
let completed_rounds = voter_set_state.completed_rounds();
let set_state = VoterSetState::Paused { completed_rounds };
aux_schema::write_voter_set_state(&**client.backend(), &set_state)?;
Ok(Some(set_state))
})?;
aux_schema::write_voter_set_state(&**client.backend(), &set_state)?;
Ok(FutureLoop::Continue((env, set_state, voter_commands_rx)))
Ok(FutureLoop::Continue((env, voter_commands_rx)))
},
}
};
@@ -1015,3 +1015,196 @@ fn test_bad_justification() {
ImportResult::AlreadyInChain
);
}
#[test]
fn voter_persists_its_votes() {
use std::iter::FromIterator;
use std::sync::atomic::{AtomicUsize, Ordering};
use futures::future;
use futures::sync::mpsc;
let _ = env_logger::try_init();
// we have two authorities but we'll only be running the voter for alice
// we are going to be listening for the prevotes it casts
let peers = &[AuthorityKeyring::Alice, AuthorityKeyring::Bob];
let voters = make_ids(peers);
// alice has a chain with 20 blocks
let mut net = GrandpaTestNet::new(TestApi::new(voters.clone()), 2);
net.peer(0).push_blocks(20, false);
net.sync();
assert_eq!(net.peer(0).client().info().unwrap().chain.best_number, 20,
"Peer #{} failed to sync", 0);
let mut runtime = current_thread::Runtime::new().unwrap();
let client = net.peer(0).client().clone();
let net = Arc::new(Mutex::new(net));
let (voter_tx, voter_rx) = mpsc::unbounded::<()>();
// startup a grandpa voter for alice but also listen for messages on a
// channel. whenever a message is received the voter is restarted. when the
// sender is dropped the voter is stopped.
{
let net = net.clone();
let voter = future::loop_fn(voter_rx, move |rx| {
let (_block_import, _, link) = net.lock().make_block_import(client.clone());
let link = link.lock().take().unwrap();
let mut voter = run_grandpa(
Config {
gossip_duration: TEST_GOSSIP_DURATION,
justification_period: 32,
local_key: Some(Arc::new(peers[0].clone().into())),
name: Some(format!("peer#{}", 0)),
},
link,
MessageRouting::new(net.clone(), 0),
InherentDataProviders::new(),
futures::empty(),
).expect("all in order with client and network");
let voter = future::poll_fn(move || {
// we need to keep the block_import alive since it owns the
// sender for the voter commands channel, if that gets dropped
// then the voter will stop
let _block_import = _block_import.clone();
voter.poll()
});
voter.select2(rx.into_future()).then(|res| match res {
Ok(future::Either::A(x)) => {
panic!("voter stopped unexpectedly: {:?}", x);
},
Ok(future::Either::B(((Some(()), rx), _))) => {
Ok(future::Loop::Continue(rx))
},
Ok(future::Either::B(((None, _), _))) => {
Ok(future::Loop::Break(()))
},
Err(future::Either::A(err)) => {
panic!("unexpected error: {:?}", err);
},
Err(future::Either::B(..)) => {
// voter_rx dropped, stop the voter.
Ok(future::Loop::Break(()))
},
})
});
runtime.spawn(voter);
}
let (exit_tx, exit_rx) = futures::sync::oneshot::channel::<()>();
// create the communication layer for bob, but don't start any
// voter. instead we'll listen for the prevote that alice casts
// and cast our own manually
{
let config = Config {
gossip_duration: TEST_GOSSIP_DURATION,
justification_period: 32,
local_key: Some(Arc::new(peers[1].clone().into())),
name: Some(format!("peer#{}", 1)),
};
let routing = MessageRouting::new(net.clone(), 1);
let network = communication::NetworkBridge::new(routing, config.clone());
let (round_rx, round_tx) = network.round_communication(
communication::Round(1),
communication::SetId(0),
Arc::new(VoterSet::from_iter(voters)),
Some(config.local_key.unwrap()),
HasVoted::No,
);
let round_tx = Arc::new(Mutex::new(round_tx));
let exit_tx = Arc::new(Mutex::new(Some(exit_tx)));
let net = net.clone();
let state = AtomicUsize::new(0);
runtime.spawn(round_rx.for_each(move |signed| {
if state.compare_and_swap(0, 1, Ordering::SeqCst) == 0 {
// the first message we receive should be a prevote from alice.
let prevote = match signed.message {
grandpa::Message::Prevote(prevote) => prevote,
_ => panic!("voter should prevote."),
};
// its chain has 20 blocks and the voter targets 3/4 of the
// unfinalized chain, so the vote should be for block 15
assert!(prevote.target_number == 15);
// we push 20 more blocks to alice's chain
net.lock().peer(0).push_blocks(20, false);
net.lock().sync();
assert_eq!(net.lock().peer(0).client().info().unwrap().chain.best_number, 40,
"Peer #{} failed to sync", 0);
let block_30_hash =
net.lock().peer(0).client().backend().blockchain().hash(30).unwrap().unwrap();
// we restart alice's voter
voter_tx.unbounded_send(()).unwrap();
// and we push our own prevote for block 30
let prevote = grandpa::Prevote {
target_number: 30,
target_hash: block_30_hash,
};
round_tx.lock().start_send(grandpa::Message::Prevote(prevote)).unwrap();
} else if state.compare_and_swap(1, 2, Ordering::SeqCst) == 1 {
// the next message we receive should be our own prevote
let prevote = match signed.message {
grandpa::Message::Prevote(prevote) => prevote,
_ => panic!("We should receive our own prevote."),
};
// targeting block 30
assert!(prevote.target_number == 30);
// after alice restarts it should send its previous prevote
// therefore we won't ever receive it again since it will be a
// known message on the gossip layer
} else if state.compare_and_swap(2, 3, Ordering::SeqCst) == 2 {
// we then receive a precommit from alice for block 15
// even though we casted a prevote for block 30
let precommit = match signed.message {
grandpa::Message::Precommit(precommit) => precommit,
_ => panic!("voter should precommit."),
};
assert!(precommit.target_number == 15);
// signal exit
exit_tx.clone().lock().take().unwrap().send(()).unwrap();
}
Ok(())
}).map_err(|_| ()));
}
let net = net.clone();
let drive_to_completion = ::tokio::timer::Interval::new_interval(TEST_ROUTING_INTERVAL)
.for_each(move |_| {
net.lock().send_import_notifications();
net.lock().send_finality_notifications();
net.lock().route_fast();
Ok(())
})
.map(|_| ())
.map_err(|_| ());
let exit = exit_rx.into_future().map(|_| ()).map_err(|_| ());
runtime.block_on(drive_to_completion.select(exit).map(|_| ()).map_err(|_| ())).unwrap();
}