polite-grandpa: Ensure only votes from a set's voters are propagated (#2720)

* make authorities available to gossip

* make local view optional until set

* address grumbles

* micro-optimize: encode without lock held
This commit is contained in:
Robert Habermeier
2019-06-09 11:38:41 +02:00
committed by Gavin Wood
parent ac895023be
commit 67138b3103
6 changed files with 327 additions and 147 deletions
@@ -151,16 +151,22 @@ fn migrate_from_version0<Block: BlockT, B, G>(
None => (0, genesis_round()),
};
let set_id = new_set.current().0;
let base = last_round_state.prevote_ghost
.expect("state is for completed round; completed rounds must have a prevote ghost; qed.");
let set_state = VoterSetState::Live {
completed_rounds: CompletedRounds::new(CompletedRound {
number: last_round_number,
state: last_round_state,
votes: Vec::new(),
base,
}),
completed_rounds: CompletedRounds::new(
CompletedRound {
number: last_round_number,
state: last_round_state,
votes: Vec::new(),
base,
},
set_id,
&new_set,
),
current_round: HasVoted::No,
};
@@ -189,6 +195,19 @@ fn migrate_from_version1<Block: BlockT, B, G>(
backend,
AUTHORITY_SET_KEY,
)? {
let set_id = set.current().0;
let completed_rounds = |number, state, base| CompletedRounds::new(
CompletedRound {
number,
state,
votes: Vec::new(),
base,
},
set_id,
&set,
);
let set_state = match load_decode::<_, V1VoterSetState<Block::Hash, NumberFor<Block>>>(
backend,
SET_STATE_KEY,
@@ -198,12 +217,7 @@ fn migrate_from_version1<Block: BlockT, B, G>(
.expect("state is for completed round; completed rounds must have a prevote ghost; qed.");
VoterSetState::Paused {
completed_rounds: CompletedRounds::new(CompletedRound {
number: last_round_number,
state: set_state,
votes: Vec::new(),
base,
}),
completed_rounds: completed_rounds(last_round_number, set_state, base),
}
},
Some(V1VoterSetState::Live(last_round_number, set_state)) => {
@@ -211,12 +225,7 @@ fn migrate_from_version1<Block: BlockT, B, G>(
.expect("state is for completed round; completed rounds must have a prevote ghost; qed.");
VoterSetState::Live {
completed_rounds: CompletedRounds::new(CompletedRound {
number: last_round_number,
state: set_state,
votes: Vec::new(),
base,
}),
completed_rounds: completed_rounds(last_round_number, set_state, base),
current_round: HasVoted::No,
}
},
@@ -226,12 +235,7 @@ fn migrate_from_version1<Block: BlockT, B, G>(
.expect("state is for completed round; completed rounds must have a prevote ghost; qed.");
VoterSetState::Live {
completed_rounds: CompletedRounds::new(CompletedRound {
number: 0,
state: set_state,
votes: Vec::new(),
base,
}),
completed_rounds: completed_rounds(0, set_state, base),
current_round: HasVoted::No,
}
},
@@ -298,12 +302,16 @@ pub(crate) fn load_persistent<Block: BlockT, B, G>(
.expect("state is for completed round; completed rounds must have a prevote ghost; qed.");
VoterSetState::Live {
completed_rounds: CompletedRounds::new(CompletedRound {
number: 0,
votes: Vec::new(),
base,
state,
}),
completed_rounds: CompletedRounds::new(
CompletedRound {
number: 0,
votes: Vec::new(),
base,
state,
},
set.current().0,
&set,
),
current_round: HasVoted::No,
}
}
@@ -325,18 +333,23 @@ pub(crate) fn load_persistent<Block: BlockT, B, G>(
info!(target: "afg", "Loading GRANDPA authority set \
from genesis on what appears to be first startup.");
let genesis_set = AuthoritySet::genesis(genesis_authorities()?);
let genesis_authorities = genesis_authorities()?;
let genesis_set = AuthoritySet::genesis(genesis_authorities.clone());
let state = make_genesis_round();
let base = state.prevote_ghost
.expect("state is for completed round; completed rounds must have a prevote ghost; qed.");
let genesis_state = VoterSetState::Live {
completed_rounds: CompletedRounds::new(CompletedRound {
number: 0,
votes: Vec::new(),
state,
base,
}),
completed_rounds: CompletedRounds::new(
CompletedRound {
number: 0,
votes: Vec::new(),
state,
base,
},
0,
&genesis_set,
),
current_round: HasVoted::No,
};
backend.insert_aux(
@@ -355,6 +368,10 @@ pub(crate) fn load_persistent<Block: BlockT, B, G>(
}
/// Update the authority set on disk after a change.
///
/// If there has just been a handoff, pass a `new_set` parameter that describes the
/// handoff. `set` in all cases should reflect the current authority set, with all
/// changes and handoffs applied.
pub(crate) fn update_authority_set<Block: BlockT, F, R>(
set: &AuthoritySet<Block::Hash, NumberFor<Block>>,
new_set: Option<&NewAuthoritySet<Block::Hash, NumberFor<Block>>>,
@@ -385,12 +402,16 @@ pub(crate) fn update_authority_set<Block: BlockT, F, R>(
new_set.canon_number.clone(),
));
let set_state = VoterSetState::<Block>::Live {
completed_rounds: CompletedRounds::new(CompletedRound {
number: 0,
state: round_state,
votes: Vec::new(),
base: (new_set.canon_hash, new_set.canon_number),
}),
completed_rounds: CompletedRounds::new(
CompletedRound {
number: 0,
state: round_state,
votes: Vec::new(),
base: (new_set.canon_hash, new_set.canon_number),
},
new_set.set_id,
&set,
),
current_round: HasVoted::No,
};
let encoded = set_state.encode();
@@ -500,7 +521,7 @@ mod test {
assert_eq!(
*authority_set.inner().read(),
AuthoritySet {
current_authorities: authorities,
current_authorities: authorities.clone(),
pending_standard_changes: ForkTree::new(),
pending_forced_changes: Vec::new(),
set_id,
@@ -510,12 +531,16 @@ mod test {
assert_eq!(
&*set_state.read(),
&VoterSetState::Live {
completed_rounds: CompletedRounds::new(CompletedRound {
number: round_number,
state: round_state.clone(),
base: round_state.prevote_ghost.unwrap(),
votes: vec![],
}),
completed_rounds: CompletedRounds::new(
CompletedRound {
number: round_number,
state: round_state.clone(),
base: round_state.prevote_ghost.unwrap(),
votes: vec![],
},
set_id,
&*authority_set.inner().read(),
),
current_round: HasVoted::No,
},
);
@@ -583,7 +608,7 @@ mod test {
assert_eq!(
*authority_set.inner().read(),
AuthoritySet {
current_authorities: authorities,
current_authorities: authorities.clone(),
pending_standard_changes: ForkTree::new(),
pending_forced_changes: Vec::new(),
set_id,
@@ -593,12 +618,16 @@ mod test {
assert_eq!(
&*set_state.read(),
&VoterSetState::Live {
completed_rounds: CompletedRounds::new(CompletedRound {
number: round_number,
state: round_state.clone(),
base: round_state.prevote_ghost.unwrap(),
votes: vec![],
}),
completed_rounds: CompletedRounds::new(
CompletedRound {
number: round_number,
state: round_state.clone(),
base: round_state.prevote_ghost.unwrap(),
votes: vec![],
},
set_id,
&*authority_set.inner().read(),
),
current_round: HasVoted::No,
},
);
@@ -71,6 +71,7 @@ use runtime_primitives::traits::{NumberFor, Block as BlockT, Zero};
use network::consensus_gossip::{self as network_gossip, MessageIntent, ValidatorContext};
use network::{config::Roles, PeerId};
use parity_codec::{Encode, Decode};
use crate::ed25519::Public as AuthorityId;
use substrate_telemetry::{telemetry, CONSENSUS_DEBUG};
use log::{trace, debug, warn};
@@ -94,6 +95,8 @@ enum Consider {
RejectPast,
/// Message is from the future. Reject.
RejectFuture,
/// Message cannot be evaluated. Reject.
RejectOutOfScope,
}
/// A view of protocol state.
@@ -300,6 +303,10 @@ pub(super) enum Misbehavior {
// A message received that's from the future relative to our view.
// always misbehavior.
FutureMessage,
// A message received that cannot be evaluated relative to our view.
// This happens before we have a view and have sent out neighbor packets.
// always misbehavior.
OutOfScopeMessage,
}
impl Misbehavior {
@@ -319,6 +326,7 @@ impl Misbehavior {
(benefit as i32).saturating_add(cost as i32)
},
FutureMessage => cost::FUTURE_MESSAGE,
OutOfScopeMessage => cost::OUT_OF_SCOPE_MESSAGE,
}
}
}
@@ -407,7 +415,7 @@ impl<N: Ord> Peers<N> {
}
}
#[derive(Debug)]
#[derive(Debug, PartialEq)]
pub(super) enum Action<H> {
// repropagate under given topic, to the given peers, applying cost/benefit to originator.
Keep(H, i32),
@@ -418,9 +426,10 @@ pub(super) enum Action<H> {
}
struct Inner<Block: BlockT> {
local_view: View<NumberFor<Block>>,
local_view: Option<View<NumberFor<Block>>>,
peers: Peers<NumberFor<Block>>,
live_topics: KeepTopics<Block>,
authorities: Vec<AuthorityId>,
config: crate::Config,
next_rebroadcast: Instant,
}
@@ -430,58 +439,87 @@ type MaybeMessage<Block> = Option<(Vec<PeerId>, NeighborPacket<NumberFor<Block>>
impl<Block: BlockT> Inner<Block> {
fn new(config: crate::Config) -> Self {
Inner {
local_view: View::default(),
local_view: None,
peers: Peers::default(),
live_topics: KeepTopics::new(),
next_rebroadcast: Instant::now() + REBROADCAST_AFTER,
authorities: Vec::new(),
config,
}
}
/// Note a round in a set has started.
fn note_round(&mut self, round: Round, set_id: SetId) -> MaybeMessage<Block> {
if self.local_view.round == round && self.local_view.set_id == set_id {
return None;
/// Note a round in the current set has started.
fn note_round(&mut self, round: Round) -> MaybeMessage<Block> {
{
let local_view = match self.local_view {
None => return None,
Some(ref mut v) => if v.round == round {
return None
} else {
v
},
};
let set_id = local_view.set_id;
debug!(target: "afg", "Voter {} noting beginning of round {:?} to network.",
self.config.name(), (round,set_id));
local_view.round = round;
self.live_topics.push(round, set_id);
}
debug!(target: "afg", "Voter {} noting beginning of round {:?} to network.",
self.config.name(), (round, set_id));
self.local_view.round = round;
self.local_view.set_id = set_id;
self.live_topics.push(round, set_id);
self.multicast_neighbor_packet()
}
/// Note that a voter set with given ID has started. Does nothing if the last
/// call to the function was with the same `set_id`.
fn note_set(&mut self, set_id: SetId) -> MaybeMessage<Block> {
if self.local_view.set_id == set_id {
return None;
}
fn note_set(&mut self, set_id: SetId, authorities: Vec<AuthorityId>) -> MaybeMessage<Block> {
{
let local_view = match self.local_view {
ref mut x @ None => x.get_or_insert(View {
round: Round(0),
set_id,
last_commit: None,
}),
Some(ref mut v) => if v.set_id == set_id {
return None
} else {
v
},
};
self.local_view.update_set(set_id);
self.live_topics.push(Round(0), set_id);
local_view.update_set(set_id);
self.live_topics.push(Round(0), set_id);
self.authorities = authorities;
}
self.multicast_neighbor_packet()
}
/// Note that we've imported a commit finalizing a given block.
fn note_commit_finalized(&mut self, finalized: NumberFor<Block>) -> MaybeMessage<Block> {
if self.local_view.last_commit.as_ref() < Some(&finalized) {
self.local_view.last_commit = Some(finalized);
self.multicast_neighbor_packet()
} else {
None
{
match self.local_view {
None => return None,
Some(ref mut v) => if v.last_commit.as_ref() < Some(&finalized) {
v.last_commit = Some(finalized);
} else {
return None
},
};
}
self.multicast_neighbor_packet()
}
fn consider_vote(&self, round: Round, set_id: SetId) -> Consider {
self.local_view.consider_vote(round, set_id)
self.local_view.as_ref().map(|v| v.consider_vote(round, set_id))
.unwrap_or(Consider::RejectOutOfScope)
}
fn consider_global(&self, set_id: SetId, number: NumberFor<Block>) -> Consider {
self.local_view.consider_global(set_id, number)
self.local_view.as_ref().map(|v| v.consider_global(set_id, number))
.unwrap_or(Consider::RejectOutOfScope)
}
fn cost_past_rejection(&self, _who: &PeerId, _round: Round, _set_id: SetId) -> i32 {
@@ -494,11 +532,18 @@ impl<Block: BlockT> Inner<Block> {
{
match self.consider_vote(full.round, full.set_id) {
Consider::RejectFuture => return Action::Discard(Misbehavior::FutureMessage.cost()),
Consider::RejectOutOfScope => return Action::Discard(Misbehavior::OutOfScopeMessage.cost()),
Consider::RejectPast =>
return Action::Discard(self.cost_past_rejection(who, full.round, full.set_id)),
Consider::Accept => {},
}
// ensure authority is part of the set.
if !self.authorities.contains(&full.message.id) {
telemetry!(CONSENSUS_DEBUG; "afg.bad_msg_signature"; "signature" => ?full.message.id);
return Action::Discard(cost::UNKNOWN_VOTER);
}
if let Err(()) = super::check_message_sig::<Block>(
&full.message.message,
&full.message.id,
@@ -527,7 +572,9 @@ impl<Block: BlockT> Inner<Block> {
Consider::RejectFuture => return Action::Discard(Misbehavior::FutureMessage.cost()),
Consider::RejectPast =>
return Action::Discard(self.cost_past_rejection(who, full.round, full.set_id)),
Consider::RejectOutOfScope => return Action::Discard(Misbehavior::OutOfScopeMessage.cost()),
Consider::Accept => {},
}
if full.message.precommits.len() != full.message.auth_data.len() || full.message.precommits.is_empty() {
@@ -561,14 +608,16 @@ impl<Block: BlockT> Inner<Block> {
}
fn multicast_neighbor_packet(&self) -> MaybeMessage<Block> {
let packet = NeighborPacket {
round: self.local_view.round,
set_id: self.local_view.set_id,
commit_finalized_height: self.local_view.last_commit.unwrap_or(Zero::zero()),
};
self.local_view.as_ref().map(|local_view| {
let packet = NeighborPacket {
round: local_view.round,
set_id: local_view.set_id,
commit_finalized_height: local_view.last_commit.unwrap_or(Zero::zero()),
};
let peers = self.peers.inner.keys().cloned().collect();
Some((peers, packet))
let peers = self.peers.inner.keys().cloned().collect();
(peers, packet)
})
}
}
@@ -579,7 +628,7 @@ pub(super) struct GossipValidator<Block: BlockT> {
}
impl<Block: BlockT> GossipValidator<Block> {
/// Create a new gossip-validator.
/// Create a new gossip-validator. This initialized the current set to 0.
pub(super) fn new(config: crate::Config) -> (GossipValidator<Block>, ReportStream) {
let (tx, rx) = mpsc::unbounded();
let val = GossipValidator {
@@ -590,21 +639,22 @@ impl<Block: BlockT> GossipValidator<Block> {
(val, ReportStream { reports: rx })
}
/// Note a round in a set has started.
pub(super) fn note_round<F>(&self, round: Round, set_id: SetId, send_neighbor: F)
/// Note a round in the current set has started.
pub(super) fn note_round<F>(&self, round: Round, send_neighbor: F)
where F: FnOnce(Vec<PeerId>, NeighborPacket<NumberFor<Block>>)
{
let maybe_msg = self.inner.write().note_round(round, set_id);
let maybe_msg = self.inner.write().note_round(round);
if let Some((to, msg)) = maybe_msg {
send_neighbor(to, msg);
}
}
/// Note that a voter set with given ID has started.
pub(super) fn note_set<F>(&self, set_id: SetId, send_neighbor: F)
/// Note that a voter set with given ID has started. Updates the current set to given
/// value and initializes the round to 0.
pub(super) fn note_set<F>(&self, set_id: SetId, authorities: Vec<AuthorityId>, send_neighbor: F)
where F: FnOnce(Vec<PeerId>, NeighborPacket<NumberFor<Block>>)
{
let maybe_msg = self.inner.write().note_set(set_id);
let maybe_msg = self.inner.write().note_set(set_id, authorities);
if let Some((to, msg)) = maybe_msg {
send_neighbor(to, msg);
}
@@ -658,19 +708,23 @@ impl<Block: BlockT> GossipValidator<Block> {
impl<Block: BlockT> network_gossip::Validator<Block> for GossipValidator<Block> {
fn new_peer(&self, context: &mut dyn ValidatorContext<Block>, who: &PeerId, _roles: Roles) {
let packet_data = {
let packet = {
let mut inner = self.inner.write();
inner.peers.new_peer(who.clone());
let packet = NeighborPacket {
round: inner.local_view.round,
set_id: inner.local_view.set_id,
commit_finalized_height: inner.local_view.last_commit.unwrap_or(Zero::zero()),
};
GossipMessage::<Block>::from(packet).encode()
inner.local_view.as_ref().map(|v| {
NeighborPacket {
round: v.round,
set_id: v.set_id,
commit_finalized_height: v.last_commit.unwrap_or(Zero::zero()),
}
})
};
context.send_message(who, packet_data);
if let Some(packet) = packet {
let packet_data = GossipMessage::<Block>::from(packet).encode();
context.send_message(who, packet_data);
}
}
fn peer_disconnected(&self, _context: &mut dyn ValidatorContext<Block>, who: &PeerId) {
@@ -746,7 +800,12 @@ impl<Block: BlockT> network_gossip::Validator<Block> for GossipValidator<Block>
}
// global message.
let our_best_commit = inner.local_view.last_commit;
let local_view = match inner.local_view {
Some(ref v) => v,
None => return false, // cannot evaluate until we have a local view.
};
let our_best_commit = local_view.last_commit;
let peer_best_commit = peer.view.last_commit;
match GossipMessage::<Block>::decode(&mut data) {
@@ -774,8 +833,13 @@ impl<Block: BlockT> network_gossip::Validator<Block> for GossipValidator<Block>
Some((None, _)) => {},
};
let local_view = match inner.local_view {
Some(ref v) => v,
None => return true, // no local view means we can't evaluate or hold any topic.
};
// global messages -- only keep the best commit.
let best_commit = inner.local_view.last_commit;
let best_commit = local_view.last_commit;
match GossipMessage::<Block>::decode(&mut data) {
None => true,
@@ -1004,8 +1068,10 @@ mod tests {
let set_id = 1;
val.note_set(SetId(set_id), Vec::new(), |_, _| {});
for round_num in 1u64..10 {
val.note_round(Round(round_num), SetId(set_id), |_, _| {});
val.note_round(Round(round_num), |_, _| {});
}
{
@@ -1025,4 +1091,47 @@ mod tests {
}
}
}
#[test]
fn message_from_unknown_authority_discarded() {
assert!(cost::UNKNOWN_VOTER != cost::BAD_SIGNATURE);
let (val, _) = GossipValidator::<Block>::new(config());
let set_id = 1;
let auth = AuthorityId::from_raw([1u8; 32]);
let peer = PeerId::random();
val.note_set(SetId(set_id), vec![auth.clone()], |_, _| {});
val.note_round(Round(0), |_, _| {});
let inner = val.inner.read();
let unknown_voter = inner.validate_round_message(&peer, &VoteOrPrecommitMessage {
round: Round(0),
set_id: SetId(set_id),
message: SignedMessage::<Block> {
message: grandpa::Message::Prevote(grandpa::Prevote {
target_hash: Default::default(),
target_number: 10,
}),
signature: Default::default(),
id: AuthorityId::from_raw([2u8; 32]),
}
});
let bad_sig = inner.validate_round_message(&peer, &VoteOrPrecommitMessage {
round: Round(0),
set_id: SetId(set_id),
message: SignedMessage::<Block> {
message: grandpa::Message::Prevote(grandpa::Prevote {
target_hash: Default::default(),
target_number: 10,
}),
signature: Default::default(),
id: auth.clone(),
}
});
assert_eq!(unknown_voter, Action::Discard(cost::UNKNOWN_VOTER));
assert_eq!(bad_sig, Action::Discard(cost::BAD_SIGNATURE));
}
}
@@ -64,12 +64,14 @@ mod cost {
pub(super) const BAD_SIGNATURE: i32 = -100;
pub(super) const MALFORMED_COMMIT: i32 = -1000;
pub(super) const FUTURE_MESSAGE: i32 = -500;
pub(super) const UNKNOWN_VOTER: i32 = -150;
pub(super) const INVALID_VIEW_CHANGE: i32 = -500;
pub(super) const PER_UNDECODABLE_BYTE: i32 = -5;
pub(super) const PER_SIGNATURE_CHECKED: i32 = -25;
pub(super) const PER_BLOCK_LOADED: i32 = -10;
pub(super) const INVALID_COMMIT: i32 = -5000;
pub(super) const OUT_OF_SCOPE_MESSAGE: i32 = -500;
}
// benefit scalars for reporting peers.
@@ -233,7 +235,7 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
pub(crate) fn new(
service: N,
config: crate::Config,
set_state: Option<(u64, &crate::environment::VoterSetState<B>)>,
set_state: Option<&crate::environment::VoterSetState<B>>,
on_exit: impl Future<Item=(),Error=()> + Clone + Send + 'static,
) -> (
Self,
@@ -244,15 +246,18 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
let validator = Arc::new(validator);
service.register_validator(validator.clone());
if let Some((set_id, set_state)) = set_state {
if let Some(set_state) = set_state {
// register all previous votes with the gossip service so that they're
// available to peers potentially stuck on a previous round.
for round in set_state.completed_rounds().iter() {
let completed = set_state.completed_rounds();
let (set_id, voters) = completed.set_info();
validator.note_set(SetId(set_id), voters.to_vec(), |_, _| {});
for round in completed.iter() {
let topic = round_topic::<B>(round.number, set_id);
// we need to note the round with the gossip validator otherwise
// messages will be ignored.
validator.note_round(Round(round.number), SetId(set_id), |_, _| {});
validator.note_round(Round(round.number), |_, _| {});
for signed in round.votes.iter() {
let message = gossip::GossipMessage::VoteOrPrecommit(
@@ -295,7 +300,7 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
(bridge, startup_work)
}
/// Get the round messages for a round in a given set ID. These are signature-checked.
/// Get the round messages for a round in the current set ID. These are signature-checked.
pub(crate) fn round_communication(
&self,
round: Round,
@@ -307,9 +312,18 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
impl Stream<Item=SignedMessage<B>,Error=Error>,
impl Sink<SinkItem=Message<B>,SinkError=Error>,
) {
// is a no-op if currently in that set.
self.validator.note_set(
set_id,
voters.voters().iter().map(|(v, _)| v.clone()).collect(),
|to, neighbor| self.service.send_message(
to,
GossipMessage::<B>::from(neighbor).encode()
),
);
self.validator.note_round(
round,
set_id,
|to, neighbor| self.service.send_message(
to,
GossipMessage::<B>::from(neighbor).encode()
@@ -410,6 +424,7 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
) {
self.validator.note_set(
set_id,
voters.voters().iter().map(|(v, _)| v.clone()).collect(),
|to, neighbor| self.service.send_message(to, GossipMessage::<B>::from(neighbor).encode()),
);
@@ -650,7 +665,7 @@ fn check_compact_commit<Block: BlockT>(
let f = voters.total_weight() - voters.threshold();
let full_threshold = voters.total_weight() + f;
// check total weight is not too high.
// check total weight is not out of range.
let mut total_weight = 0;
for (_, ref id) in &msg.auth_data {
if let Some(weight) = voters.info(id).map(|info| info.weight()) {
@@ -46,7 +46,7 @@ use crate::{
use consensus_common::SelectChain;
use crate::authorities::SharedAuthoritySet;
use crate::authorities::{AuthoritySet, SharedAuthoritySet};
use crate::consensus_changes::SharedConsensusChanges;
use crate::justification::GrandpaJustification;
use crate::until_imported::UntilVoteTargetImported;
@@ -66,11 +66,13 @@ pub struct CompletedRound<Block: BlockT> {
pub votes: Vec<SignedMessage<Block>>,
}
// Data about last completed rounds. Stores NUM_LAST_COMPLETED_ROUNDS and always
// Data about last completed rounds within a single voter set. Stores NUM_LAST_COMPLETED_ROUNDS and always
// contains data about at least one round (genesis).
#[derive(Debug, Clone, PartialEq)]
pub struct CompletedRounds<Block: BlockT> {
inner: VecDeque<CompletedRound<Block>>,
rounds: VecDeque<CompletedRound<Block>>,
set_id: u64,
voters: Vec<AuthorityId>,
}
// NOTE: the current strategy for persisting completed rounds is very naive
@@ -80,34 +82,51 @@ const NUM_LAST_COMPLETED_ROUNDS: usize = 2;
impl<Block: BlockT> Encode for CompletedRounds<Block> {
fn encode(&self) -> Vec<u8> {
Vec::from_iter(&self.inner).encode()
let v = Vec::from_iter(&self.rounds);
(&v, &self.set_id, &self.voters).encode()
}
}
impl<Block: BlockT> Decode for CompletedRounds<Block> {
fn decode<I: parity_codec::Input>(value: &mut I) -> Option<Self> {
Vec::<CompletedRound<Block>>::decode(value)
.map(|completed_rounds| CompletedRounds {
inner: completed_rounds.into(),
<(Vec<CompletedRound<Block>>, u64, Vec<AuthorityId>)>::decode(value)
.map(|(rounds, set_id, voters)| CompletedRounds {
rounds: rounds.into(),
set_id,
voters,
})
}
}
impl<Block: BlockT> CompletedRounds<Block> {
/// Create a new completed rounds tracker with NUM_LAST_COMPLETED_ROUNDS capacity.
pub fn new(genesis: CompletedRound<Block>) -> CompletedRounds<Block> {
let mut inner = VecDeque::with_capacity(NUM_LAST_COMPLETED_ROUNDS);
inner.push_back(genesis);
CompletedRounds { inner }
pub(crate) fn new(
genesis: CompletedRound<Block>,
set_id: u64,
voters: &AuthoritySet<Block::Hash, NumberFor<Block>>,
)
-> CompletedRounds<Block>
{
let mut rounds = VecDeque::with_capacity(NUM_LAST_COMPLETED_ROUNDS);
rounds.push_back(genesis);
let voters = voters.current().1.iter().map(|(a, _)| a.clone()).collect();
CompletedRounds { rounds, set_id, voters }
}
/// Get the set-id and voter set of the completed rounds.
pub fn set_info(&self) -> (u64, &[AuthorityId]) {
(self.set_id, &self.voters[..])
}
/// Iterate over all completed rounds.
pub fn iter(&self) -> impl Iterator<Item=&CompletedRound<Block>> {
self.inner.iter()
self.rounds.iter()
}
/// Returns the last (latest) completed round.
pub fn last(&self) -> &CompletedRound<Block> {
self.inner.back()
self.rounds.back()
.expect("inner is never empty; always contains at least genesis; qed")
}
@@ -118,11 +137,11 @@ impl<Block: BlockT> CompletedRounds<Block> {
return false;
}
if self.inner.len() == NUM_LAST_COMPLETED_ROUNDS {
self.inner.pop_front();
if self.rounds.len() == NUM_LAST_COMPLETED_ROUNDS {
self.rounds.pop_front();
}
self.inner.push_back(completed_round);
self.rounds.push_back(completed_round);
true
}
+11 -7
View File
@@ -501,7 +501,7 @@ pub fn run_grandpa_voter<B, E, Block: BlockT<Hash=H256>, N, RA, SC, X>(
let (network, network_startup) = NetworkBridge::new(
network,
config.clone(),
Some((authority_set.set_id(), &set_state.read())),
Some(&set_state.read()),
on_exit.clone(),
);
@@ -640,12 +640,16 @@ pub fn run_grandpa_voter<B, E, Block: BlockT<Hash=H256>, N, RA, SC, X>(
let set_state = VoterSetState::Live {
// always start at round 0 when changing sets.
completed_rounds: CompletedRounds::new(CompletedRound {
number: 0,
state: genesis_state,
base: (new.canon_hash, new.canon_number),
votes: Vec::new(),
}),
completed_rounds: CompletedRounds::new(
CompletedRound {
number: 0,
state: genesis_state,
base: (new.canon_hash, new.canon_number),
votes: Vec::new(),
},
new.set_id,
&*authority_set.inner().read(),
),
current_round: HasVoted::No,
};
@@ -225,12 +225,16 @@ pub fn run_grandpa_observer<B, E, Block: BlockT<Hash=H256>, N, RA, SC>(
let set_state = VoterSetState::Live::<Block> {
// always start at round 0 when changing sets.
completed_rounds: CompletedRounds::new(CompletedRound {
number: 0,
state: genesis_state,
base: (new.canon_hash, new.canon_number),
votes: Vec::new(),
}),
completed_rounds: CompletedRounds::new(
CompletedRound {
number: 0,
state: genesis_state,
base: (new.canon_hash, new.canon_number),
votes: Vec::new(),
},
new.set_id,
&*authority_set.inner().read(),
),
current_round: HasVoted::No,
};