mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-17 18:21:02 +00:00
grandpa: observer (#2244)
* grandpa: initial implementation of minimal grandpa worker * grandpa: extract grandpa observer future to function * grandpa: add test for observer * grandpa: start observer if no local key is defined * grandpa: add minor comments * grandpa: observer: log invalid commit * grandpa: observer: persist voter set state on authority change and pause * grandpa: observer: use commit processing callback * grandpa: keep run_grandpa to avoid breaking public api * grandpa: use grandpa::process_commit_validation_result * grandpa: use finality-grandpa 0.7.2
This commit is contained in:
committed by
Robert Habermeier
parent
96ef462c46
commit
e31cd26a9e
Generated
+583
-560
File diff suppressed because it is too large
Load Diff
@@ -22,7 +22,7 @@ network = { package = "substrate-network", path = "../network" }
|
|||||||
service = { package = "substrate-service", path = "../service", optional = true }
|
service = { package = "substrate-service", path = "../service", optional = true }
|
||||||
srml-finality-tracker = { path = "../../srml/finality-tracker" }
|
srml-finality-tracker = { path = "../../srml/finality-tracker" }
|
||||||
fg_primitives = { package = "substrate-finality-grandpa-primitives", path = "primitives" }
|
fg_primitives = { package = "substrate-finality-grandpa-primitives", path = "primitives" }
|
||||||
grandpa = { package = "finality-grandpa", version = "0.7.1", features = ["derive-codec"] }
|
grandpa = { package = "finality-grandpa", version = "0.7.2", features = ["derive-codec"] }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
consensus_common = { package = "substrate-consensus-common", path = "../consensus/common", features = ["test-helpers"] }
|
consensus_common = { package = "substrate-consensus-common", path = "../consensus/common", features = ["test-helpers"] }
|
||||||
|
|||||||
@@ -298,31 +298,7 @@ impl<Block: BlockT<Hash=H256>, B, E, N, RA> grandpa::Chain<Block::Hash, NumberFo
|
|||||||
NumberFor<Block>: BlockNumberOps,
|
NumberFor<Block>: BlockNumberOps,
|
||||||
{
|
{
|
||||||
fn ancestry(&self, base: Block::Hash, block: Block::Hash) -> Result<Vec<Block::Hash>, GrandpaError> {
|
fn ancestry(&self, base: Block::Hash, block: Block::Hash) -> Result<Vec<Block::Hash>, GrandpaError> {
|
||||||
if base == block { return Err(GrandpaError::NotDescendent) }
|
ancestry(&self.inner, base, block)
|
||||||
|
|
||||||
let tree_route_res = ::client::blockchain::tree_route(
|
|
||||||
self.inner.backend().blockchain(),
|
|
||||||
BlockId::Hash(block),
|
|
||||||
BlockId::Hash(base),
|
|
||||||
);
|
|
||||||
|
|
||||||
let tree_route = match tree_route_res {
|
|
||||||
Ok(tree_route) => tree_route,
|
|
||||||
Err(e) => {
|
|
||||||
debug!(target: "afg", "Encountered error computing ancestry between block {:?} and base {:?}: {:?}",
|
|
||||||
block, base, e);
|
|
||||||
|
|
||||||
return Err(GrandpaError::NotDescendent);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if tree_route.common_block().hash != base {
|
|
||||||
return Err(GrandpaError::NotDescendent);
|
|
||||||
}
|
|
||||||
|
|
||||||
// skip one because our ancestry is meant to start from the parent of `block`,
|
|
||||||
// and `tree_route` includes it.
|
|
||||||
Ok(tree_route.retracted().iter().skip(1).map(|e| e.hash).collect())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn best_chain_containing(&self, block: Block::Hash) -> Option<(Block::Hash, NumberFor<Block>)> {
|
fn best_chain_containing(&self, block: Block::Hash) -> Option<(Block::Hash, NumberFor<Block>)> {
|
||||||
@@ -400,6 +376,41 @@ impl<Block: BlockT<Hash=H256>, B, E, N, RA> grandpa::Chain<Block::Hash, NumberFo
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn ancestry<B, Block: BlockT<Hash=H256>, E, RA>(
|
||||||
|
client: &Client<B, E, Block, RA>,
|
||||||
|
base: Block::Hash,
|
||||||
|
block: Block::Hash,
|
||||||
|
) -> Result<Vec<Block::Hash>, GrandpaError> where
|
||||||
|
B: Backend<Block, Blake2Hasher>,
|
||||||
|
E: CallExecutor<Block, Blake2Hasher>,
|
||||||
|
{
|
||||||
|
if base == block { return Err(GrandpaError::NotDescendent) }
|
||||||
|
|
||||||
|
let tree_route_res = ::client::blockchain::tree_route(
|
||||||
|
client.backend().blockchain(),
|
||||||
|
BlockId::Hash(block),
|
||||||
|
BlockId::Hash(base),
|
||||||
|
);
|
||||||
|
|
||||||
|
let tree_route = match tree_route_res {
|
||||||
|
Ok(tree_route) => tree_route,
|
||||||
|
Err(e) => {
|
||||||
|
debug!(target: "afg", "Encountered error computing ancestry between block {:?} and base {:?}: {:?}",
|
||||||
|
block, base, e);
|
||||||
|
|
||||||
|
return Err(GrandpaError::NotDescendent);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if tree_route.common_block().hash != base {
|
||||||
|
return Err(GrandpaError::NotDescendent);
|
||||||
|
}
|
||||||
|
|
||||||
|
// skip one because our ancestry is meant to start from the parent of `block`,
|
||||||
|
// and `tree_route` includes it.
|
||||||
|
Ok(tree_route.retracted().iter().skip(1).map(|e| e.hash).collect())
|
||||||
|
}
|
||||||
|
|
||||||
impl<B, E, Block: BlockT<Hash=H256>, N, RA> voter::Environment<Block::Hash, NumberFor<Block>> for Environment<B, E, Block, N, RA> where
|
impl<B, E, Block: BlockT<Hash=H256>, N, RA> voter::Environment<Block::Hash, NumberFor<Block>> for Environment<B, E, Block, N, RA> where
|
||||||
Block: 'static,
|
Block: 'static,
|
||||||
B: Backend<Block, Blake2Hasher> + 'static,
|
B: Backend<Block, Blake2Hasher> + 'static,
|
||||||
|
|||||||
@@ -89,6 +89,7 @@ mod environment;
|
|||||||
mod finality_proof;
|
mod finality_proof;
|
||||||
mod import;
|
mod import;
|
||||||
mod justification;
|
mod justification;
|
||||||
|
mod observer;
|
||||||
mod until_imported;
|
mod until_imported;
|
||||||
|
|
||||||
#[cfg(feature="service-integration")]
|
#[cfg(feature="service-integration")]
|
||||||
@@ -97,6 +98,7 @@ mod service_integration;
|
|||||||
pub use service_integration::{LinkHalfForService, BlockImportForService};
|
pub use service_integration::{LinkHalfForService, BlockImportForService};
|
||||||
pub use communication::Network;
|
pub use communication::Network;
|
||||||
pub use finality_proof::{prove_finality, check_finality_proof};
|
pub use finality_proof::{prove_finality, check_finality_proof};
|
||||||
|
pub use observer::run_grandpa_observer;
|
||||||
|
|
||||||
use aux_schema::PersistentData;
|
use aux_schema::PersistentData;
|
||||||
use environment::{CompletedRound, CompletedRounds, Environment, HasVoted, SharedVoterSetState, VoterSetState};
|
use environment::{CompletedRound, CompletedRounds, Environment, HasVoted, SharedVoterSetState, VoterSetState};
|
||||||
@@ -433,7 +435,7 @@ fn register_finality_tracker_inherent_data_provider<B, E, Block: BlockT<Hash=H25
|
|||||||
|
|
||||||
/// Run a GRANDPA voter as a task. Provide configuration and a link to a
|
/// Run a GRANDPA voter as a task. Provide configuration and a link to a
|
||||||
/// block import worker that has already been instantiated with `block_import`.
|
/// block import worker that has already been instantiated with `block_import`.
|
||||||
pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
|
pub fn run_grandpa_voter<B, E, Block: BlockT<Hash=H256>, N, RA>(
|
||||||
config: Config,
|
config: Config,
|
||||||
link: LinkHalf<B, E, Block, RA>,
|
link: LinkHalf<B, E, Block, RA>,
|
||||||
network: N,
|
network: N,
|
||||||
@@ -656,3 +658,24 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
|
|||||||
|
|
||||||
Ok(voter_work.select(on_exit).then(|_| Ok(())))
|
Ok(voter_work.select(on_exit).then(|_| Ok(())))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[deprecated(since = "1.1", note = "Please switch to run_grandpa_voter.")]
|
||||||
|
pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
|
||||||
|
config: Config,
|
||||||
|
link: LinkHalf<B, E, Block, RA>,
|
||||||
|
network: N,
|
||||||
|
inherent_data_providers: InherentDataProviders,
|
||||||
|
on_exit: impl Future<Item=(),Error=()> + Clone + Send + 'static,
|
||||||
|
) -> ::client::error::Result<impl Future<Item=(),Error=()> + Send + 'static> where
|
||||||
|
Block::Hash: Ord,
|
||||||
|
B: Backend<Block, Blake2Hasher> + 'static,
|
||||||
|
E: CallExecutor<Block, Blake2Hasher> + Send + Sync + 'static,
|
||||||
|
N: Network<Block> + Send + Sync + 'static,
|
||||||
|
N::In: Send + 'static,
|
||||||
|
NumberFor<Block>: BlockNumberOps,
|
||||||
|
DigestFor<Block>: Encode,
|
||||||
|
DigestItemFor<Block>: DigestItem<AuthorityId=AuthorityId>,
|
||||||
|
RA: Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
run_grandpa_voter(config, link, network, inherent_data_providers, on_exit)
|
||||||
|
}
|
||||||
|
|||||||
@@ -0,0 +1,281 @@
|
|||||||
|
// Copyright 2018-2019 Parity Technologies (UK) Ltd.
|
||||||
|
// This file is part of Substrate.
|
||||||
|
|
||||||
|
// Substrate is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// Substrate is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU General Public License
|
||||||
|
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use futures::prelude::*;
|
||||||
|
use futures::future::{self, Loop as FutureLoop};
|
||||||
|
|
||||||
|
use grandpa::{
|
||||||
|
BlockNumberOps, Error as GrandpaError, round::State as RoundState, voter, voter_set::VoterSet
|
||||||
|
};
|
||||||
|
use log::{debug, info, warn};
|
||||||
|
|
||||||
|
use client::{CallExecutor, Client, backend::Backend};
|
||||||
|
use ed25519::Public as AuthorityId;
|
||||||
|
use runtime_primitives::traits::{NumberFor, Block as BlockT, DigestItemFor, DigestItem};
|
||||||
|
use substrate_primitives::{ed25519, H256, Blake2Hasher};
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
AuthoritySignature, global_communication, CommandOrError, Config, environment,
|
||||||
|
Error, LinkHalf, Network, aux_schema::PersistentData, VoterCommand, VoterSetState,
|
||||||
|
};
|
||||||
|
use crate::authorities::SharedAuthoritySet;
|
||||||
|
use crate::communication::NetworkBridge;
|
||||||
|
use crate::consensus_changes::SharedConsensusChanges;
|
||||||
|
use crate::environment::{CompletedRound, CompletedRounds, HasVoted};
|
||||||
|
|
||||||
|
struct ObserverChain<'a, Block: BlockT, B, E, RA>(&'a Client<B, E, Block, RA>);
|
||||||
|
|
||||||
|
impl<'a, Block: BlockT<Hash=H256>, B, E, RA> grandpa::Chain<Block::Hash, NumberFor<Block>>
|
||||||
|
for ObserverChain<'a, Block, B, E, RA> where
|
||||||
|
B: Backend<Block, Blake2Hasher>,
|
||||||
|
E: CallExecutor<Block, Blake2Hasher>,
|
||||||
|
NumberFor<Block>: BlockNumberOps,
|
||||||
|
{
|
||||||
|
fn ancestry(&self, base: Block::Hash, block: Block::Hash) -> Result<Vec<Block::Hash>, GrandpaError> {
|
||||||
|
environment::ancestry(&self.0, base, block)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn best_chain_containing(&self, _block: Block::Hash) -> Option<(Block::Hash, NumberFor<Block>)> {
|
||||||
|
// only used by voter
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn grandpa_observer<B, E, Block: BlockT<Hash=H256>, RA, S>(
|
||||||
|
client: &Arc<Client<B, E, Block, RA>>,
|
||||||
|
authority_set: &SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
|
||||||
|
consensus_changes: &SharedConsensusChanges<Block::Hash, NumberFor<Block>>,
|
||||||
|
voters: &Arc<VoterSet<AuthorityId>>,
|
||||||
|
last_finalized_number: NumberFor<Block>,
|
||||||
|
commits: S,
|
||||||
|
) -> impl Future<Item=(), Error=CommandOrError<H256, NumberFor<Block>>> where
|
||||||
|
NumberFor<Block>: BlockNumberOps,
|
||||||
|
B: Backend<Block, Blake2Hasher>,
|
||||||
|
E: CallExecutor<Block, Blake2Hasher> + Send + Sync,
|
||||||
|
RA: Send + Sync,
|
||||||
|
S: Stream<
|
||||||
|
Item = voter::CommunicationIn<H256, NumberFor<Block>, AuthoritySignature, AuthorityId>,
|
||||||
|
Error = CommandOrError<Block::Hash, NumberFor<Block>>,
|
||||||
|
>,
|
||||||
|
{
|
||||||
|
let authority_set = authority_set.clone();
|
||||||
|
let consensus_changes = consensus_changes.clone();
|
||||||
|
let client = client.clone();
|
||||||
|
let voters = voters.clone();
|
||||||
|
|
||||||
|
let observer = commits.fold(last_finalized_number, move |last_finalized_number, global| {
|
||||||
|
let (round, commit, callback) = match global {
|
||||||
|
voter::CommunicationIn::Commit(round, commit, callback) => {
|
||||||
|
let commit = grandpa::Commit::from(commit);
|
||||||
|
(round, commit, callback)
|
||||||
|
},
|
||||||
|
voter::CommunicationIn::Auxiliary(_) => {
|
||||||
|
// ignore aux messages
|
||||||
|
return future::ok(last_finalized_number);
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
// if the commit we've received targets a block lower than the last
|
||||||
|
// finalized, ignore it and continue with the current state
|
||||||
|
if commit.target_number < last_finalized_number {
|
||||||
|
return future::ok(last_finalized_number);
|
||||||
|
}
|
||||||
|
|
||||||
|
let validation_result = match grandpa::validate_commit(
|
||||||
|
&commit,
|
||||||
|
&voters,
|
||||||
|
&ObserverChain(&*client),
|
||||||
|
) {
|
||||||
|
Ok(r) => r,
|
||||||
|
Err(e) => return future::err(e.into()),
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(_) = validation_result.ghost() {
|
||||||
|
let finalized_hash = commit.target_hash;
|
||||||
|
let finalized_number = commit.target_number;
|
||||||
|
|
||||||
|
// commit is valid, finalize the block it targets
|
||||||
|
match environment::finalize_block(
|
||||||
|
&client,
|
||||||
|
&authority_set,
|
||||||
|
&consensus_changes,
|
||||||
|
None,
|
||||||
|
finalized_hash,
|
||||||
|
finalized_number,
|
||||||
|
(round, commit).into(),
|
||||||
|
) {
|
||||||
|
Ok(_) => {},
|
||||||
|
Err(e) => return future::err(e),
|
||||||
|
};
|
||||||
|
|
||||||
|
grandpa::process_commit_validation_result(validation_result, callback);
|
||||||
|
|
||||||
|
// proceed processing with new finalized block number
|
||||||
|
future::ok(finalized_number)
|
||||||
|
} else {
|
||||||
|
debug!(target: "afg", "Received invalid commit: ({:?}, {:?})", round, commit);
|
||||||
|
|
||||||
|
grandpa::process_commit_validation_result(validation_result, callback);
|
||||||
|
|
||||||
|
// commit is invalid, continue processing commits with the current state
|
||||||
|
future::ok(last_finalized_number)
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
observer.map(|_| ())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Run a GRANDPA observer as a task, the observer will finalize blocks only by
|
||||||
|
/// listening for and validating GRANDPA commits instead of following the full
|
||||||
|
/// protocol. Provide configuration and a link to a block import worker that has
|
||||||
|
/// already been instantiated with `block_import`.
|
||||||
|
pub fn run_grandpa_observer<B, E, Block: BlockT<Hash=H256>, N, RA>(
|
||||||
|
config: Config,
|
||||||
|
link: LinkHalf<B, E, Block, RA>,
|
||||||
|
network: N,
|
||||||
|
on_exit: impl Future<Item=(),Error=()> + Clone + Send + 'static,
|
||||||
|
) -> ::client::error::Result<impl Future<Item=(),Error=()> + Send + 'static> where
|
||||||
|
B: Backend<Block, Blake2Hasher> + 'static,
|
||||||
|
E: CallExecutor<Block, Blake2Hasher> + Send + Sync + 'static,
|
||||||
|
N: Network<Block> + Send + Sync + 'static,
|
||||||
|
N::In: Send + 'static,
|
||||||
|
NumberFor<Block>: BlockNumberOps,
|
||||||
|
DigestItemFor<Block>: DigestItem<AuthorityId=AuthorityId>,
|
||||||
|
RA: Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
let LinkHalf {
|
||||||
|
client,
|
||||||
|
persistent_data,
|
||||||
|
voter_commands_rx,
|
||||||
|
} = link;
|
||||||
|
|
||||||
|
let PersistentData { authority_set, consensus_changes, set_state } = persistent_data;
|
||||||
|
let initial_state = (authority_set, consensus_changes, set_state, voter_commands_rx.into_future());
|
||||||
|
|
||||||
|
let (network, network_startup) = NetworkBridge::new(network, config.clone(), on_exit.clone());
|
||||||
|
|
||||||
|
let observer_work = future::loop_fn(initial_state, move |state| {
|
||||||
|
let (authority_set, consensus_changes, set_state, voter_commands_rx) = state;
|
||||||
|
let set_id = authority_set.set_id();
|
||||||
|
let voters = Arc::new(authority_set.current_authorities());
|
||||||
|
let client = client.clone();
|
||||||
|
|
||||||
|
// start global communication stream for the current set
|
||||||
|
let (global_in, _) = global_communication(
|
||||||
|
None,
|
||||||
|
set_id,
|
||||||
|
&voters,
|
||||||
|
&client,
|
||||||
|
&network,
|
||||||
|
);
|
||||||
|
|
||||||
|
let chain_info = match client.info() {
|
||||||
|
Ok(i) => i,
|
||||||
|
Err(e) => return future::Either::B(future::err(Error::Client(e))),
|
||||||
|
};
|
||||||
|
|
||||||
|
let last_finalized_number = chain_info.chain.finalized_number;
|
||||||
|
|
||||||
|
// create observer for the current set
|
||||||
|
let observer = grandpa_observer(
|
||||||
|
&client,
|
||||||
|
&authority_set,
|
||||||
|
&consensus_changes,
|
||||||
|
&voters,
|
||||||
|
last_finalized_number,
|
||||||
|
global_in,
|
||||||
|
);
|
||||||
|
|
||||||
|
let handle_voter_command = move |command, voter_commands_rx| {
|
||||||
|
// the observer doesn't use the voter set state, but we need to
|
||||||
|
// update it on-disk in case we restart as validator in the future.
|
||||||
|
let set_state = match command {
|
||||||
|
VoterCommand::Pause(reason) => {
|
||||||
|
info!(target: "afg", "Pausing old validator set: {}", reason);
|
||||||
|
|
||||||
|
let completed_rounds = set_state.read().completed_rounds();
|
||||||
|
let set_state = VoterSetState::Paused { completed_rounds };
|
||||||
|
|
||||||
|
crate::aux_schema::write_voter_set_state(&**client.backend(), &set_state)?;
|
||||||
|
|
||||||
|
set_state
|
||||||
|
},
|
||||||
|
VoterCommand::ChangeAuthorities(new) => {
|
||||||
|
// start the new authority set using the block where the
|
||||||
|
// set changed (not where the signal happened!) as the base.
|
||||||
|
let genesis_state = RoundState::genesis((new.canon_hash, new.canon_number));
|
||||||
|
|
||||||
|
let set_state = VoterSetState::Live::<Block> {
|
||||||
|
// always start at round 0 when changing sets.
|
||||||
|
completed_rounds: CompletedRounds::new(CompletedRound {
|
||||||
|
number: 0,
|
||||||
|
state: genesis_state,
|
||||||
|
base: (new.canon_hash, new.canon_number),
|
||||||
|
votes: Vec::new(),
|
||||||
|
}),
|
||||||
|
current_round: HasVoted::No,
|
||||||
|
};
|
||||||
|
|
||||||
|
crate::aux_schema::write_voter_set_state(&**client.backend(), &set_state)?;
|
||||||
|
|
||||||
|
set_state
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(FutureLoop::Continue((authority_set, consensus_changes, set_state.into(), voter_commands_rx)))
|
||||||
|
};
|
||||||
|
|
||||||
|
// run observer and listen to commands (switch authorities or pause)
|
||||||
|
future::Either::A(observer.select2(voter_commands_rx).then(move |res| match res {
|
||||||
|
Ok(future::Either::A((_, _))) => {
|
||||||
|
// observer commit stream doesn't conclude naturally; this could reasonably be an error.
|
||||||
|
Ok(FutureLoop::Break(()))
|
||||||
|
},
|
||||||
|
Err(future::Either::B(_)) => {
|
||||||
|
// the `voter_commands_rx` stream should not fail.
|
||||||
|
Ok(FutureLoop::Break(()))
|
||||||
|
},
|
||||||
|
Ok(future::Either::B(((None, _), _))) => {
|
||||||
|
// the `voter_commands_rx` stream should never conclude since it's never closed.
|
||||||
|
Ok(FutureLoop::Break(()))
|
||||||
|
},
|
||||||
|
Err(future::Either::A((CommandOrError::Error(e), _))) => {
|
||||||
|
// return inner observer error
|
||||||
|
Err(e)
|
||||||
|
},
|
||||||
|
Ok(future::Either::B(((Some(command), voter_commands_rx), _))) => {
|
||||||
|
// some command issued externally
|
||||||
|
handle_voter_command(command, voter_commands_rx.into_future())
|
||||||
|
},
|
||||||
|
Err(future::Either::A((CommandOrError::VoterCommand(command), voter_commands_rx))) => {
|
||||||
|
// some command issued internally
|
||||||
|
handle_voter_command(command, voter_commands_rx)
|
||||||
|
},
|
||||||
|
}))
|
||||||
|
});
|
||||||
|
|
||||||
|
let observer_work = observer_work
|
||||||
|
.map(|_| ())
|
||||||
|
.map_err(|e| {
|
||||||
|
warn!("GRANDPA Observer failed: {:?}", e);
|
||||||
|
});
|
||||||
|
|
||||||
|
let observer_work = network_startup.and_then(move |()| observer_work);
|
||||||
|
|
||||||
|
Ok(observer_work.select(on_exit).map(|_| ()).map_err(|_| ()))
|
||||||
|
}
|
||||||
@@ -371,19 +371,25 @@ fn make_ids(keys: &[AuthorityKeyring]) -> Vec<(AuthorityId, u64)> {
|
|||||||
|
|
||||||
// run the voters to completion. provide a closure to be invoked after
|
// run the voters to completion. provide a closure to be invoked after
|
||||||
// the voters are spawned but before blocking on them.
|
// the voters are spawned but before blocking on them.
|
||||||
fn run_to_completion_with<F: FnOnce()>(
|
fn run_to_completion_with<F>(
|
||||||
blocks: u64,
|
blocks: u64,
|
||||||
net: Arc<Mutex<GrandpaTestNet>>,
|
net: Arc<Mutex<GrandpaTestNet>>,
|
||||||
peers: &[AuthorityKeyring],
|
peers: &[AuthorityKeyring],
|
||||||
before_waiting: F,
|
with: F,
|
||||||
) -> u64 {
|
) -> u64 where
|
||||||
|
F: FnOnce(current_thread::Handle) -> Option<Box<Future<Item=(),Error=()>>>
|
||||||
|
{
|
||||||
use parking_lot::RwLock;
|
use parking_lot::RwLock;
|
||||||
|
|
||||||
let mut finality_notifications = Vec::new();
|
let mut wait_for = Vec::new();
|
||||||
let mut runtime = current_thread::Runtime::new().unwrap();
|
let mut runtime = current_thread::Runtime::new().unwrap();
|
||||||
|
|
||||||
let highest_finalized = Arc::new(RwLock::new(0));
|
let highest_finalized = Arc::new(RwLock::new(0));
|
||||||
|
|
||||||
|
if let Some(f) = (with)(runtime.handle()) {
|
||||||
|
wait_for.push(f);
|
||||||
|
};
|
||||||
|
|
||||||
for (peer_id, key) in peers.iter().enumerate() {
|
for (peer_id, key) in peers.iter().enumerate() {
|
||||||
let highest_finalized = highest_finalized.clone();
|
let highest_finalized = highest_finalized.clone();
|
||||||
let (client, link) = {
|
let (client, link) = {
|
||||||
@@ -395,20 +401,25 @@ fn run_to_completion_with<F: FnOnce()>(
|
|||||||
link,
|
link,
|
||||||
)
|
)
|
||||||
};
|
};
|
||||||
finality_notifications.push(
|
|
||||||
client.finality_notification_stream()
|
wait_for.push(
|
||||||
.take_while(move |n| {
|
Box::new(
|
||||||
let mut highest_finalized = highest_finalized.write();
|
client.finality_notification_stream()
|
||||||
if *n.header.number() > *highest_finalized {
|
.take_while(move |n| {
|
||||||
*highest_finalized = *n.header.number();
|
let mut highest_finalized = highest_finalized.write();
|
||||||
}
|
if *n.header.number() > *highest_finalized {
|
||||||
Ok(n.header.number() < &blocks)
|
*highest_finalized = *n.header.number();
|
||||||
})
|
}
|
||||||
.for_each(|_| Ok(()))
|
Ok(n.header.number() < &blocks)
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
.map(|_| ())
|
||||||
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
fn assert_send<T: Send>(_: &T) { }
|
fn assert_send<T: Send>(_: &T) { }
|
||||||
|
|
||||||
let voter = run_grandpa(
|
let voter = run_grandpa_voter(
|
||||||
Config {
|
Config {
|
||||||
gossip_duration: TEST_GOSSIP_DURATION,
|
gossip_duration: TEST_GOSSIP_DURATION,
|
||||||
justification_period: 32,
|
justification_period: 32,
|
||||||
@@ -427,7 +438,7 @@ fn run_to_completion_with<F: FnOnce()>(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// wait for all finalized on each.
|
// wait for all finalized on each.
|
||||||
let wait_for = ::futures::future::join_all(finality_notifications)
|
let wait_for = ::futures::future::join_all(wait_for)
|
||||||
.map(|_| ())
|
.map(|_| ())
|
||||||
.map_err(|_| ());
|
.map_err(|_| ());
|
||||||
|
|
||||||
@@ -441,17 +452,14 @@ fn run_to_completion_with<F: FnOnce()>(
|
|||||||
.map(|_| ())
|
.map(|_| ())
|
||||||
.map_err(|_| ());
|
.map_err(|_| ());
|
||||||
|
|
||||||
(before_waiting)();
|
|
||||||
|
|
||||||
runtime.block_on(wait_for.select(drive_to_completion).map_err(|_| ())).unwrap();
|
runtime.block_on(wait_for.select(drive_to_completion).map_err(|_| ())).unwrap();
|
||||||
|
|
||||||
let highest_finalized = *highest_finalized.read();
|
let highest_finalized = *highest_finalized.read();
|
||||||
|
|
||||||
highest_finalized
|
highest_finalized
|
||||||
}
|
}
|
||||||
|
|
||||||
fn run_to_completion(blocks: u64, net: Arc<Mutex<GrandpaTestNet>>, peers: &[AuthorityKeyring]) -> u64 {
|
fn run_to_completion(blocks: u64, net: Arc<Mutex<GrandpaTestNet>>, peers: &[AuthorityKeyring]) -> u64 {
|
||||||
run_to_completion_with(blocks, net, peers, || {})
|
run_to_completion_with(blocks, net, peers, |_| None)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@@ -478,7 +486,7 @@ fn finalize_3_voters_no_observers() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn finalize_3_voters_1_observer() {
|
fn finalize_3_voters_1_full_observer() {
|
||||||
let peers = &[AuthorityKeyring::Alice, AuthorityKeyring::Bob, AuthorityKeyring::Charlie];
|
let peers = &[AuthorityKeyring::Alice, AuthorityKeyring::Bob, AuthorityKeyring::Charlie];
|
||||||
let voters = make_ids(peers);
|
let voters = make_ids(peers);
|
||||||
|
|
||||||
@@ -509,7 +517,7 @@ fn finalize_3_voters_1_observer() {
|
|||||||
.take_while(|n| Ok(n.header.number() < &20))
|
.take_while(|n| Ok(n.header.number() < &20))
|
||||||
.for_each(move |_| Ok(()))
|
.for_each(move |_| Ok(()))
|
||||||
);
|
);
|
||||||
let voter = run_grandpa(
|
let voter = run_grandpa_voter(
|
||||||
Config {
|
Config {
|
||||||
gossip_duration: TEST_GOSSIP_DURATION,
|
gossip_duration: TEST_GOSSIP_DURATION,
|
||||||
justification_period: 32,
|
justification_period: 32,
|
||||||
@@ -539,7 +547,7 @@ fn finalize_3_voters_1_observer() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn transition_3_voters_twice_1_observer() {
|
fn transition_3_voters_twice_1_full_observer() {
|
||||||
let _ = env_logger::try_init();
|
let _ = env_logger::try_init();
|
||||||
let peers_a = &[
|
let peers_a = &[
|
||||||
AuthorityKeyring::Alice,
|
AuthorityKeyring::Alice,
|
||||||
@@ -671,7 +679,7 @@ fn transition_3_voters_twice_1_observer() {
|
|||||||
assert_eq!(set.pending_changes().count(), 0);
|
assert_eq!(set.pending_changes().count(), 0);
|
||||||
})
|
})
|
||||||
);
|
);
|
||||||
let voter = run_grandpa(
|
let voter = run_grandpa_voter(
|
||||||
Config {
|
Config {
|
||||||
gossip_duration: TEST_GOSSIP_DURATION,
|
gossip_duration: TEST_GOSSIP_DURATION,
|
||||||
justification_period: 32,
|
justification_period: 32,
|
||||||
@@ -906,7 +914,7 @@ fn force_change_to_new_set() {
|
|||||||
let net = Arc::new(Mutex::new(net));
|
let net = Arc::new(Mutex::new(net));
|
||||||
|
|
||||||
let runner_net = net.clone();
|
let runner_net = net.clone();
|
||||||
let add_blocks = move || {
|
let add_blocks = move |_| {
|
||||||
net.lock().peer(0).push_blocks(1, false);
|
net.lock().peer(0).push_blocks(1, false);
|
||||||
|
|
||||||
{
|
{
|
||||||
@@ -938,6 +946,8 @@ fn force_change_to_new_set() {
|
|||||||
assert_eq!(set.current(), (1, voters.as_slice()));
|
assert_eq!(set.current(), (1, voters.as_slice()));
|
||||||
assert_eq!(set.pending_changes().count(), 0);
|
assert_eq!(set.pending_changes().count(), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
None
|
||||||
};
|
};
|
||||||
|
|
||||||
// it will only finalize if the forced transition happens.
|
// it will only finalize if the forced transition happens.
|
||||||
@@ -1071,7 +1081,7 @@ fn voter_persists_its_votes() {
|
|||||||
let (_block_import, _, link) = net.lock().make_block_import(client.clone());
|
let (_block_import, _, link) = net.lock().make_block_import(client.clone());
|
||||||
let link = link.lock().take().unwrap();
|
let link = link.lock().take().unwrap();
|
||||||
|
|
||||||
let mut voter = run_grandpa(
|
let mut voter = run_grandpa_voter(
|
||||||
Config {
|
Config {
|
||||||
gossip_duration: TEST_GOSSIP_DURATION,
|
gossip_duration: TEST_GOSSIP_DURATION,
|
||||||
justification_period: 32,
|
justification_period: 32,
|
||||||
@@ -1225,3 +1235,44 @@ fn voter_persists_its_votes() {
|
|||||||
|
|
||||||
runtime.block_on(drive_to_completion.select(exit).map(|_| ()).map_err(|_| ())).unwrap();
|
runtime.block_on(drive_to_completion.select(exit).map(|_| ()).map_err(|_| ())).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn finalize_3_voters_1_light_observer() {
|
||||||
|
let _ = env_logger::try_init();
|
||||||
|
let authorities = &[AuthorityKeyring::Alice, AuthorityKeyring::Bob, AuthorityKeyring::Charlie];
|
||||||
|
let voters = make_ids(authorities);
|
||||||
|
|
||||||
|
let mut net = GrandpaTestNet::new(TestApi::new(voters), 4);
|
||||||
|
net.peer(0).push_blocks(20, false);
|
||||||
|
net.sync();
|
||||||
|
|
||||||
|
for i in 0..4 {
|
||||||
|
assert_eq!(net.peer(i).client().info().unwrap().chain.best_number, 20,
|
||||||
|
"Peer #{} failed to sync", i);
|
||||||
|
}
|
||||||
|
|
||||||
|
let net = Arc::new(Mutex::new(net));
|
||||||
|
let link = net.lock().peer(3).data.lock().take().expect("link initialized on startup; qed");
|
||||||
|
|
||||||
|
let finality_notifications = net.lock().peer(3).client().finality_notification_stream()
|
||||||
|
.take_while(|n| Ok(n.header.number() < &20))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
run_to_completion_with(20, net.clone(), authorities, |executor| {
|
||||||
|
executor.spawn(
|
||||||
|
run_grandpa_observer(
|
||||||
|
Config {
|
||||||
|
gossip_duration: TEST_GOSSIP_DURATION,
|
||||||
|
justification_period: 32,
|
||||||
|
local_key: None,
|
||||||
|
name: Some("observer".to_string()),
|
||||||
|
},
|
||||||
|
link,
|
||||||
|
MessageRouting::new(net.clone(), 3),
|
||||||
|
Exit,
|
||||||
|
).unwrap()
|
||||||
|
).unwrap();
|
||||||
|
|
||||||
|
Some(Box::new(finality_notifications.map(|_| ())))
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|||||||
@@ -110,19 +110,33 @@ construct_service_factory! {
|
|||||||
local_key
|
local_key
|
||||||
};
|
};
|
||||||
|
|
||||||
executor.spawn(grandpa::run_grandpa(
|
let config = grandpa::Config {
|
||||||
grandpa::Config {
|
local_key,
|
||||||
local_key,
|
// FIXME #1578 make this available through chainspec
|
||||||
// FIXME #1578 make this available through chainspec
|
gossip_duration: Duration::from_millis(333),
|
||||||
gossip_duration: Duration::from_millis(333),
|
justification_period: 4096,
|
||||||
justification_period: 4096,
|
name: Some(service.config.name.clone())
|
||||||
name: Some(service.config.name.clone())
|
};
|
||||||
|
|
||||||
|
match config.local_key {
|
||||||
|
None => {
|
||||||
|
executor.spawn(grandpa::run_grandpa_observer(
|
||||||
|
config,
|
||||||
|
link_half,
|
||||||
|
service.network(),
|
||||||
|
service.on_exit(),
|
||||||
|
)?);
|
||||||
},
|
},
|
||||||
link_half,
|
Some(_) => {
|
||||||
service.network(),
|
executor.spawn(grandpa::run_grandpa_voter(
|
||||||
service.config.custom.inherent_data_providers.clone(),
|
config,
|
||||||
service.on_exit(),
|
link_half,
|
||||||
)?);
|
service.network(),
|
||||||
|
service.config.custom.inherent_data_providers.clone(),
|
||||||
|
service.on_exit(),
|
||||||
|
)?);
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
Ok(service)
|
Ok(service)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user