grandpa: make observer relay round messages (#3021)

* grandpa: make observer relay round messages

* grandpa: observer also notes next round

* grandpa: no need to note commit round
This commit is contained in:
André Silva
2019-07-05 08:55:10 +01:00
committed by Gavin Wood
parent a199a989b8
commit 13164304b3
2 changed files with 45 additions and 9 deletions
@@ -309,17 +309,12 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
(bridge, startup_work)
}
/// Get the round messages for a round in the current set ID. These are signature-checked.
pub(crate) fn round_communication(
/// Note the beginning of a new round to the `GossipValidator`.
pub(crate) fn note_round(
&self,
round: Round,
set_id: SetId,
voters: Arc<VoterSet<AuthorityId>>,
local_key: Option<Arc<ed25519::Pair>>,
has_voted: HasVoted<B>,
) -> (
impl Stream<Item=SignedMessage<B>,Error=Error>,
impl Sink<SinkItem=Message<B>,SinkError=Error>,
voters: &VoterSet<AuthorityId>,
) {
// is a no-op if currently in that set.
self.validator.note_set(
@@ -338,6 +333,25 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
GossipMessage::<B>::from(neighbor).encode()
),
);
}
/// Get the round messages for a round in the current set ID. These are signature-checked.
pub(crate) fn round_communication(
&self,
round: Round,
set_id: SetId,
voters: Arc<VoterSet<AuthorityId>>,
local_key: Option<Arc<ed25519::Pair>>,
has_voted: HasVoted<B>,
) -> (
impl Stream<Item=SignedMessage<B>,Error=Error>,
impl Sink<SinkItem=Message<B>,SinkError=Error>,
) {
self.note_round(
round,
set_id,
&*voters,
);
let locals = local_key.and_then(|pair| {
let public = pair.public();
@@ -57,13 +57,14 @@ impl<'a, Block: BlockT<Hash=H256>, B, E, RA> grandpa::Chain<Block::Hash, NumberF
}
}
fn grandpa_observer<B, E, Block: BlockT<Hash=H256>, RA, S>(
fn grandpa_observer<B, E, Block: BlockT<Hash=H256>, RA, S, F>(
client: &Arc<Client<B, E, Block, RA>>,
authority_set: &SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
consensus_changes: &SharedConsensusChanges<Block::Hash, NumberFor<Block>>,
voters: &Arc<VoterSet<AuthorityId>>,
last_finalized_number: NumberFor<Block>,
commits: S,
note_round: F,
) -> impl Future<Item=(), Error=CommandOrError<H256, NumberFor<Block>>> where
NumberFor<Block>: BlockNumberOps,
B: Backend<Block, Blake2Hasher>,
@@ -73,6 +74,7 @@ fn grandpa_observer<B, E, Block: BlockT<Hash=H256>, RA, S>(
Item = CommunicationIn<Block>,
Error = CommandOrError<Block::Hash, NumberFor<Block>>,
>,
F: Fn(u64),
{
let authority_set = authority_set.clone();
let consensus_changes = consensus_changes.clone();
@@ -124,6 +126,10 @@ fn grandpa_observer<B, E, Block: BlockT<Hash=H256>, RA, S>(
Err(e) => return future::err(e),
};
// note that we've observed completion of this round through the commit,
// and that implies that the next round has started.
note_round(round + 1);
grandpa::process_commit_validation_result(validation_result, callback);
// proceed processing with new finalized block number
@@ -188,6 +194,21 @@ pub fn run_grandpa_observer<B, E, Block: BlockT<Hash=H256>, N, RA, SC>(
let last_finalized_number = client.info().chain.finalized_number;
// NOTE: since we are not using `round_communication` we have to
// manually note the round with the gossip validator, otherwise we won't
// relay round messages. we want all full nodes to contribute to vote
// availability.
let note_round = {
let network = network.clone();
let voters = voters.clone();
move |round| network.note_round(
crate::communication::Round(round),
crate::communication::SetId(set_id),
&*voters,
)
};
// create observer for the current set
let observer = grandpa_observer(
&client,
@@ -196,6 +217,7 @@ pub fn run_grandpa_observer<B, E, Block: BlockT<Hash=H256>, N, RA, SC>(
&voters,
last_finalized_number,
global_in,
note_round,
);
let handle_voter_command = move |command, voter_commands_rx| {