mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-31 00:31:02 +00:00
Offline fallback for GRANDPA (#1619)
Co-authored-by: André Silva <andre.beat@gmail.com> * skeleton for finality tracker * dispatch events when nothing finalized for a long time * begin integrating finality tracker into grandpa * add delay field to pending change * add has_api_with function to sr_version for querying APIs * partially integrate new force changes into grandpa * implement forced changes * get srml-grandpa compiling * Update core/finality-grandpa/src/authorities.rs Co-Authored-By: rphmeier <rphmeier@gmail.com> * Update core/finality-grandpa/src/authorities.rs Co-Authored-By: rphmeier <rphmeier@gmail.com> * Update core/finality-grandpa/src/authorities.rs Co-Authored-By: rphmeier <rphmeier@gmail.com> * remove explicit dependence on CoreApi * increase node runtime version * integrate grandpa forced changes into node runtime * add some tests to finality-tracker * integrate finality tracking into node-runtime * test forced-change logic * test forced changes in the authority-set handler * kill some unneeded bounds in client * test forced-changes in finality-grandpa and fix logic * build wasm and finality-tracker is no-std * restart voter on forced change * allow returning custom error type from lock_import_and_run * extract out most DB logic to aux_schema and use atomic client ops * unify authority set writing * implement set pausing * bump runtime version * note on DB when we pause. * core: grandpa: integrate forced changes with multiple pending standard changes * core: grandpa: fix AuthoritySet tests * runtime: bump impl_version * core: clear pending justification requests after forced change import * srml: finality-tracker: use FinalizedInherentData * core: log requests for clearing justification requests * core, node: update runtimes * core: grandpa: fix tests * core: grandpa: remove todos and add comments * core: grandpa: use has_api_with from ApiExt * core: fix tests * core: grandpa: remove unnecessary mut modifier * core: replace PostImportActions bitflags with struct * core: grandpa: restrict genesis on forced authority set change * core: grandpa: add more docs * core: grandpa: prevent safety violations in Environment::finalize_block * core: grandpa: register finality tracker inherent data provider * core: grandpa: fix tests * node: update runtime blobs * core: grandpa: remove outdated todo * core: aura: fix typo in log message * core: grandpa: check re-finalization is on canonical chain * srml: finality-tracker: fix initialization * node: update runtime wasm * srml: finality-tracker: don't re-initialize config keys
This commit is contained in:
committed by
André Silva
parent
128d164f2b
commit
dfb48a2405
@@ -66,21 +66,27 @@ use runtime_primitives::traits::{
|
||||
DigestItemFor, DigestItem,
|
||||
};
|
||||
use fg_primitives::GrandpaApi;
|
||||
use inherents::InherentDataProviders;
|
||||
use runtime_primitives::generic::BlockId;
|
||||
use substrate_primitives::{ed25519, H256, Ed25519AuthorityId, Blake2Hasher};
|
||||
use substrate_telemetry::{telemetry, CONSENSUS_TRACE, CONSENSUS_DEBUG, CONSENSUS_WARN};
|
||||
|
||||
use srml_finality_tracker;
|
||||
|
||||
use grandpa::Error as GrandpaError;
|
||||
use grandpa::{voter, round::State as RoundState, BlockNumberOps, VoterSet};
|
||||
|
||||
use network::Service as NetworkService;
|
||||
use network::consensus_gossip as network_gossip;
|
||||
|
||||
use std::fmt;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
pub use fg_primitives::ScheduledChange;
|
||||
|
||||
mod authorities;
|
||||
mod aux_schema;
|
||||
mod communication;
|
||||
mod consensus_changes;
|
||||
mod environment;
|
||||
@@ -94,9 +100,8 @@ mod service_integration;
|
||||
#[cfg(feature="service-integration")]
|
||||
pub use service_integration::{LinkHalfForService, BlockImportForService};
|
||||
|
||||
use authorities::SharedAuthoritySet;
|
||||
use consensus_changes::{ConsensusChanges, SharedConsensusChanges};
|
||||
use environment::{Environment, ExitOrError, NewAuthoritySet};
|
||||
use aux_schema::{PersistentData, VoterSetState};
|
||||
use environment::Environment;
|
||||
pub use finality_proof::{prove_finality, check_finality_proof};
|
||||
use import::GrandpaBlockImport;
|
||||
use until_imported::UntilCommitBlocksImported;
|
||||
@@ -104,17 +109,9 @@ use until_imported::UntilCommitBlocksImported;
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
const LAST_COMPLETED_KEY: &[u8] = b"grandpa_completed_round";
|
||||
const AUTHORITY_SET_KEY: &[u8] = b"grandpa_voters";
|
||||
const CONSENSUS_CHANGES_KEY: &[u8] = b"grandpa_consensus_changes";
|
||||
|
||||
const GRANDPA_ENGINE_ID: network::ConsensusEngineId = [b'a', b'f', b'g', b'1'];
|
||||
|
||||
const MESSAGE_ROUND_TOLERANCE: u64 = 2;
|
||||
|
||||
/// round-number, round-state
|
||||
type LastCompleted<H, N> = (u64, RoundState<H, N>);
|
||||
|
||||
/// A GRANDPA message for a substrate chain.
|
||||
pub type Message<Block> = grandpa::Message<<Block as BlockT>::Hash, NumberFor<Block>>;
|
||||
/// A signed message.
|
||||
@@ -557,13 +554,81 @@ impl<B, E, Block: BlockT<Hash=H256>, RA> BlockStatus<Block> for Arc<Client<B, E,
|
||||
}
|
||||
}
|
||||
|
||||
/// Half of a link between a block-import worker and a the background voter.
|
||||
// This should remain non-clone.
|
||||
/// A new authority set along with the canonical block it changed at.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct NewAuthoritySet<H, N> {
|
||||
pub(crate) canon_number: N,
|
||||
pub(crate) canon_hash: H,
|
||||
pub(crate) set_id: u64,
|
||||
pub(crate) authorities: Vec<(Ed25519AuthorityId, u64)>,
|
||||
}
|
||||
|
||||
/// Commands issued to the voter.
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum VoterCommand<H, N> {
|
||||
/// Pause the voter for given reason.
|
||||
Pause(String),
|
||||
/// New authorities.
|
||||
ChangeAuthorities(NewAuthoritySet<H, N>)
|
||||
}
|
||||
|
||||
impl<H, N> fmt::Display for VoterCommand<H, N> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
match *self {
|
||||
VoterCommand::Pause(ref reason) => write!(f, "Pausing voter: {}", reason),
|
||||
VoterCommand::ChangeAuthorities(_) => write!(f, "Changing authorities"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Signals either an early exit of a voter or an error.
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum CommandOrError<H, N> {
|
||||
/// An error occurred.
|
||||
Error(Error),
|
||||
/// A command to the voter.
|
||||
VoterCommand(VoterCommand<H, N>),
|
||||
}
|
||||
|
||||
impl<H, N> From<Error> for CommandOrError<H, N> {
|
||||
fn from(e: Error) -> Self {
|
||||
CommandOrError::Error(e)
|
||||
}
|
||||
}
|
||||
|
||||
impl<H, N> From<ClientError> for CommandOrError<H, N> {
|
||||
fn from(e: ClientError) -> Self {
|
||||
CommandOrError::Error(Error::Client(e))
|
||||
}
|
||||
}
|
||||
|
||||
impl<H, N> From<grandpa::Error> for CommandOrError<H, N> {
|
||||
fn from(e: grandpa::Error) -> Self {
|
||||
CommandOrError::Error(Error::from(e))
|
||||
}
|
||||
}
|
||||
|
||||
impl<H, N> From<VoterCommand<H, N>> for CommandOrError<H, N> {
|
||||
fn from(e: VoterCommand<H, N>) -> Self {
|
||||
CommandOrError::VoterCommand(e)
|
||||
}
|
||||
}
|
||||
|
||||
impl<H: fmt::Debug, N: fmt::Debug> ::std::error::Error for CommandOrError<H, N> { }
|
||||
|
||||
impl<H, N> fmt::Display for CommandOrError<H, N> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
match *self {
|
||||
CommandOrError::Error(ref e) => write!(f, "{:?}", e),
|
||||
CommandOrError::VoterCommand(ref cmd) => write!(f, "{}", cmd),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct LinkHalf<B, E, Block: BlockT<Hash=H256>, RA> {
|
||||
client: Arc<Client<B, E, Block, RA>>,
|
||||
authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
|
||||
authority_set_change: mpsc::UnboundedReceiver<NewAuthoritySet<Block::Hash, NumberFor<Block>>>,
|
||||
consensus_changes: SharedConsensusChanges<Block::Hash, NumberFor<Block>>,
|
||||
persistent_data: PersistentData<Block::Hash, NumberFor<Block>>,
|
||||
voter_commands_rx: mpsc::UnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
|
||||
}
|
||||
|
||||
/// Make block importer and link half necessary to tie the background voter
|
||||
@@ -577,60 +642,41 @@ pub fn block_import<B, E, Block: BlockT<Hash=H256>, RA, PRA>(
|
||||
E: CallExecutor<Block, Blake2Hasher> + 'static + Clone + Send + Sync,
|
||||
RA: Send + Sync,
|
||||
PRA: ProvideRuntimeApi,
|
||||
PRA::Api: GrandpaApi<Block>
|
||||
PRA::Api: GrandpaApi<Block>,
|
||||
{
|
||||
use runtime_primitives::traits::Zero;
|
||||
let authority_set = match Backend::get_aux(&**client.backend(), AUTHORITY_SET_KEY)? {
|
||||
None => {
|
||||
info!(target: "afg", "Loading GRANDPA authorities \
|
||||
from genesis on what appears to be first startup.");
|
||||
|
||||
// no authority set on disk: fetch authorities from genesis state.
|
||||
// if genesis state is not available, we may be a light client, but these
|
||||
// are unsupported for following GRANDPA directly.
|
||||
let chain_info = client.info()?;
|
||||
let genesis_hash = chain_info.chain.genesis_hash;
|
||||
|
||||
let persistent_data = aux_schema::load_persistent(
|
||||
&**client.backend(),
|
||||
genesis_hash,
|
||||
<NumberFor<Block>>::zero(),
|
||||
|| {
|
||||
let genesis_authorities = api.runtime_api()
|
||||
.grandpa_authorities(&BlockId::number(Zero::zero()))?;
|
||||
telemetry!(CONSENSUS_DEBUG; "afg.loading_authorities";
|
||||
"authorities_len" => ?genesis_authorities.len()
|
||||
);
|
||||
|
||||
let authority_set = SharedAuthoritySet::genesis(genesis_authorities);
|
||||
let encoded = authority_set.inner().read().encode();
|
||||
Backend::insert_aux(&**client.backend(), &[(AUTHORITY_SET_KEY, &encoded[..])], &[])?;
|
||||
|
||||
authority_set
|
||||
Ok(genesis_authorities)
|
||||
}
|
||||
Some(raw) => crate::authorities::AuthoritySet::decode(&mut &raw[..])
|
||||
.ok_or_else(|| ::client::error::ErrorKind::Backend(
|
||||
format!("GRANDPA authority set kept in invalid format")
|
||||
))?
|
||||
.into(),
|
||||
};
|
||||
)?;
|
||||
|
||||
let consensus_changes = Backend::get_aux(&**client.backend(), CONSENSUS_CHANGES_KEY)?;
|
||||
let consensus_changes = Arc::new(parking_lot::Mutex::new(match consensus_changes {
|
||||
Some(raw) => ConsensusChanges::decode(&mut &raw[..])
|
||||
.ok_or_else(|| ::client::error::ErrorKind::Backend(
|
||||
format!("GRANDPA consensus changes kept in invalid format")
|
||||
))?,
|
||||
None => ConsensusChanges::empty(),
|
||||
}));
|
||||
|
||||
let (authority_set_change_tx, authority_set_change_rx) = mpsc::unbounded();
|
||||
let (voter_commands_tx, voter_commands_rx) = mpsc::unbounded();
|
||||
|
||||
Ok((
|
||||
GrandpaBlockImport::new(
|
||||
client.clone(),
|
||||
authority_set.clone(),
|
||||
authority_set_change_tx,
|
||||
consensus_changes.clone(),
|
||||
persistent_data.authority_set.clone(),
|
||||
voter_commands_tx,
|
||||
persistent_data.consensus_changes.clone(),
|
||||
api,
|
||||
),
|
||||
LinkHalf {
|
||||
client,
|
||||
authority_set,
|
||||
authority_set_change: authority_set_change_rx,
|
||||
consensus_changes,
|
||||
persistent_data,
|
||||
voter_commands_rx,
|
||||
},
|
||||
))
|
||||
}
|
||||
@@ -644,11 +690,11 @@ fn committer_communication<Block: BlockT<Hash=H256>, B, E, N, RA>(
|
||||
) -> (
|
||||
impl Stream<
|
||||
Item = (u64, ::grandpa::CompactCommit<H256, NumberFor<Block>, ed25519::Signature, Ed25519AuthorityId>),
|
||||
Error = ExitOrError<H256, NumberFor<Block>>,
|
||||
Error = CommandOrError<H256, NumberFor<Block>>,
|
||||
>,
|
||||
impl Sink<
|
||||
SinkItem = (u64, ::grandpa::Commit<H256, NumberFor<Block>, ed25519::Signature, Ed25519AuthorityId>),
|
||||
SinkError = ExitOrError<H256, NumberFor<Block>>,
|
||||
SinkError = CommandOrError<H256, NumberFor<Block>>,
|
||||
>,
|
||||
) where
|
||||
B: Backend<Block, Blake2Hasher>,
|
||||
@@ -687,12 +733,37 @@ fn committer_communication<Block: BlockT<Hash=H256>, B, E, N, RA>(
|
||||
(commit_in, commit_out)
|
||||
}
|
||||
|
||||
/// Register the finality tracker inherent data provider (which is used by
|
||||
/// GRANDPA), if not registered already.
|
||||
fn register_finality_tracker_inherent_data_provider<B, E, Block: BlockT<Hash=H256>, RA>(
|
||||
client: Arc<Client<B, E, Block, RA>>,
|
||||
inherent_data_providers: &InherentDataProviders,
|
||||
) -> Result<(), consensus_common::Error> where
|
||||
B: Backend<Block, Blake2Hasher> + 'static,
|
||||
E: CallExecutor<Block, Blake2Hasher> + Send + Sync + 'static,
|
||||
RA: Send + Sync + 'static,
|
||||
{
|
||||
if !inherent_data_providers.has_provider(&srml_finality_tracker::INHERENT_IDENTIFIER) {
|
||||
inherent_data_providers
|
||||
.register_provider(srml_finality_tracker::InherentDataProvider::new(move || {
|
||||
match client.backend().blockchain().info() {
|
||||
Err(e) => Err(std::borrow::Cow::Owned(e.to_string())),
|
||||
Ok(info) => Ok(info.finalized_number),
|
||||
}
|
||||
}))
|
||||
.map_err(|err| consensus_common::ErrorKind::InherentData(err.into()).into())
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Run a GRANDPA voter as a task. Provide configuration and a link to a
|
||||
/// block import worker that has already been instantiated with `block_import`.
|
||||
pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
|
||||
config: Config,
|
||||
link: LinkHalf<B, E, Block, RA>,
|
||||
network: N,
|
||||
inherent_data_providers: InherentDataProviders,
|
||||
on_exit: impl Future<Item=(),Error=()> + Send + 'static,
|
||||
) -> ::client::error::Result<impl Future<Item=(),Error=()> + Send + 'static> where
|
||||
Block::Hash: Ord,
|
||||
@@ -706,29 +777,18 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
|
||||
RA: Send + Sync + 'static,
|
||||
{
|
||||
use futures::future::{self, Loop as FutureLoop};
|
||||
use runtime_primitives::traits::Zero;
|
||||
|
||||
let LinkHalf {
|
||||
client,
|
||||
authority_set,
|
||||
authority_set_change,
|
||||
consensus_changes,
|
||||
persistent_data,
|
||||
voter_commands_rx,
|
||||
} = link;
|
||||
|
||||
let chain_info = client.info()?;
|
||||
let genesis_hash = chain_info.chain.genesis_hash;
|
||||
|
||||
// we shadow network with the wrapping/rebroadcasting network to avoid
|
||||
// accidental reuse.
|
||||
let (broadcast_worker, network) = communication::rebroadcasting_network(network);
|
||||
let PersistentData { authority_set, set_state, consensus_changes } = persistent_data;
|
||||
|
||||
let (last_round_number, last_state) = match Backend::get_aux(&**client.backend(), LAST_COMPLETED_KEY)? {
|
||||
None => (0, RoundState::genesis((genesis_hash, <NumberFor<Block>>::zero()))),
|
||||
Some(raw) => LastCompleted::decode(&mut &raw[..])
|
||||
.ok_or_else(|| ::client::error::ErrorKind::Backend(
|
||||
format!("Last GRANDPA round state kept in invalid format")
|
||||
))?
|
||||
};
|
||||
register_finality_tracker_inherent_data_provider(client.clone(), &inherent_data_providers)?;
|
||||
|
||||
let voters = authority_set.current_authorities();
|
||||
|
||||
@@ -740,95 +800,131 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
|
||||
set_id: authority_set.set_id(),
|
||||
authority_set: authority_set.clone(),
|
||||
consensus_changes: consensus_changes.clone(),
|
||||
last_completed: environment::LastCompletedRound::new(set_state.round()),
|
||||
});
|
||||
|
||||
let initial_state = (initial_environment, last_round_number, last_state, authority_set_change.into_future());
|
||||
let initial_state = (initial_environment, set_state, voter_commands_rx.into_future());
|
||||
let voter_work = future::loop_fn(initial_state, move |params| {
|
||||
let (env, last_round_number, last_state, authority_set_change) = params;
|
||||
let (env, set_state, voter_commands_rx) = params;
|
||||
debug!(target: "afg", "{}: Starting new voter with set ID {}", config.name(), env.set_id);
|
||||
telemetry!(CONSENSUS_DEBUG; "afg.starting_new_voter";
|
||||
"name" => ?config.name(), "set_id" => ?env.set_id
|
||||
);
|
||||
|
||||
let chain_info = match client.info() {
|
||||
Ok(i) => i,
|
||||
Err(e) => return future::Either::B(future::err(Error::Client(e))),
|
||||
let mut maybe_voter = match set_state.clone() {
|
||||
VoterSetState::Live(last_round_number, last_round_state) => {
|
||||
let chain_info = match client.info() {
|
||||
Ok(i) => i,
|
||||
Err(e) => return future::Either::B(future::err(Error::Client(e))),
|
||||
};
|
||||
|
||||
let last_finalized = (
|
||||
chain_info.chain.finalized_hash,
|
||||
chain_info.chain.finalized_number,
|
||||
);
|
||||
|
||||
let committer_data = committer_communication(
|
||||
config.local_key.clone(),
|
||||
env.set_id,
|
||||
&env.voters,
|
||||
&client,
|
||||
&network,
|
||||
);
|
||||
|
||||
let voters = (*env.voters).clone();
|
||||
|
||||
Some(voter::Voter::new(
|
||||
env.clone(),
|
||||
voters,
|
||||
committer_data,
|
||||
last_round_number,
|
||||
last_round_state,
|
||||
last_finalized,
|
||||
))
|
||||
}
|
||||
VoterSetState::Paused(_, _) => None,
|
||||
};
|
||||
|
||||
let last_finalized = (
|
||||
chain_info.chain.finalized_hash,
|
||||
chain_info.chain.finalized_number,
|
||||
);
|
||||
// needs to be combined with another future otherwise it can deadlock.
|
||||
let poll_voter = future::poll_fn(move || match maybe_voter {
|
||||
Some(ref mut voter) => voter.poll(),
|
||||
None => Ok(Async::NotReady),
|
||||
});
|
||||
|
||||
let committer_data = committer_communication(
|
||||
config.local_key.clone(),
|
||||
env.set_id,
|
||||
&env.voters,
|
||||
&client,
|
||||
&network,
|
||||
);
|
||||
|
||||
let voters = (*env.voters).clone();
|
||||
|
||||
let voter = voter::Voter::new(
|
||||
env,
|
||||
voters,
|
||||
committer_data,
|
||||
last_round_number,
|
||||
last_state,
|
||||
last_finalized,
|
||||
);
|
||||
let client = client.clone();
|
||||
let config = config.clone();
|
||||
let network = network.clone();
|
||||
let authority_set = authority_set.clone();
|
||||
let consensus_changes = consensus_changes.clone();
|
||||
|
||||
let trigger_authority_set_change = |new: NewAuthoritySet<_, _>, authority_set_change| {
|
||||
let env = Arc::new(Environment {
|
||||
inner: client,
|
||||
config,
|
||||
voters: Arc::new(new.authorities.into_iter().collect()),
|
||||
set_id: new.set_id,
|
||||
network,
|
||||
authority_set,
|
||||
consensus_changes,
|
||||
});
|
||||
let handle_voter_command = move |command: VoterCommand<_, _>, voter_commands_rx| {
|
||||
match command {
|
||||
VoterCommand::ChangeAuthorities(new) => {
|
||||
// start the new authority set using the block where the
|
||||
// set changed (not where the signal happened!) as the base.
|
||||
let genesis_state = RoundState::genesis((new.canon_hash, new.canon_number));
|
||||
let env = Arc::new(Environment {
|
||||
inner: client,
|
||||
config,
|
||||
voters: Arc::new(new.authorities.into_iter().collect()),
|
||||
set_id: new.set_id,
|
||||
network,
|
||||
authority_set,
|
||||
consensus_changes,
|
||||
last_completed: environment::LastCompletedRound::new(
|
||||
(0, genesis_state.clone())
|
||||
),
|
||||
});
|
||||
|
||||
// start the new authority set using the block where the
|
||||
// set changed (not where the signal happened!) as the base.
|
||||
Ok(FutureLoop::Continue((
|
||||
env,
|
||||
0, // always start at round 0 when changing sets.
|
||||
RoundState::genesis((new.canon_hash, new.canon_number)),
|
||||
authority_set_change,
|
||||
)))
|
||||
|
||||
let set_state = VoterSetState::Live(
|
||||
0, // always start at round 0 when changing sets.
|
||||
genesis_state,
|
||||
);
|
||||
|
||||
Ok(FutureLoop::Continue((env, set_state, voter_commands_rx)))
|
||||
}
|
||||
VoterCommand::Pause(reason) => {
|
||||
info!(target: "afg", "Pausing old validator set: {}", reason);
|
||||
|
||||
// not racing because old voter is shut down.
|
||||
let (last_round_number, last_round_state) = env.last_completed.read();
|
||||
let set_state = VoterSetState::Paused(
|
||||
last_round_number,
|
||||
last_round_state,
|
||||
);
|
||||
|
||||
aux_schema::write_voter_set_state(&**client.backend(), &set_state)?;
|
||||
|
||||
Ok(FutureLoop::Continue((env, set_state, voter_commands_rx)))
|
||||
},
|
||||
}
|
||||
};
|
||||
|
||||
future::Either::A(voter.select2(authority_set_change).then(move |res| match res {
|
||||
future::Either::A(poll_voter.select2(voter_commands_rx).then(move |res| match res {
|
||||
Ok(future::Either::A(((), _))) => {
|
||||
// voters don't conclude naturally; this could reasonably be an error.
|
||||
Ok(FutureLoop::Break(()))
|
||||
},
|
||||
Err(future::Either::B(_)) => {
|
||||
// the `authority_set_change` stream should not fail.
|
||||
// the `voter_commands_rx` stream should not fail.
|
||||
Ok(FutureLoop::Break(()))
|
||||
},
|
||||
Ok(future::Either::B(((None, _), _))) => {
|
||||
// the `authority_set_change` stream should never conclude since it's never closed.
|
||||
// the `voter_commands_rx` stream should never conclude since it's never closed.
|
||||
Ok(FutureLoop::Break(()))
|
||||
},
|
||||
Err(future::Either::A((ExitOrError::Error(e), _))) => {
|
||||
Err(future::Either::A((CommandOrError::Error(e), _))) => {
|
||||
// return inner voter error
|
||||
Err(e)
|
||||
}
|
||||
Ok(future::Either::B(((Some(new), authority_set_change), _))) => {
|
||||
// authority set change triggered externally through the channel
|
||||
trigger_authority_set_change(new, authority_set_change.into_future())
|
||||
Ok(future::Either::B(((Some(command), voter_commands_rx), _))) => {
|
||||
// some command issued externally.
|
||||
handle_voter_command(command, voter_commands_rx.into_future())
|
||||
}
|
||||
Err(future::Either::A((ExitOrError::AuthoritiesChanged(new), authority_set_change))) => {
|
||||
// authority set change triggered internally by finalizing a change block
|
||||
trigger_authority_set_change(new, authority_set_change)
|
||||
Err(future::Either::A((CommandOrError::VoterCommand(command), voter_commands_rx))) => {
|
||||
// some command issued internally.
|
||||
handle_voter_command(command, voter_commands_rx)
|
||||
},
|
||||
}))
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user