diff --git a/substrate/core/finality-grandpa/src/observer.rs b/substrate/core/finality-grandpa/src/observer.rs index 2532ee8098..bce292262e 100644 --- a/substrate/core/finality-grandpa/src/observer.rs +++ b/substrate/core/finality-grandpa/src/observer.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use futures::prelude::*; -use futures::future::{self, Loop as FutureLoop}; +use futures::{future, sync::mpsc}; use grandpa::{ BlockNumberOps, Error as GrandpaError, voter, voter_set::VoterSet @@ -31,7 +31,7 @@ use primitives::{H256, Blake2Hasher}; use crate::{ global_communication, CommandOrError, CommunicationIn, Config, environment, - LinkHalf, Network, aux_schema::PersistentData, VoterCommand, VoterSetState, + LinkHalf, Network, Error, aux_schema::PersistentData, VoterCommand, VoterSetState, }; use crate::authorities::SharedAuthoritySet; use crate::communication::NetworkBridge; @@ -171,116 +171,19 @@ pub fn run_grandpa_observer, N, RA, SC>( voter_commands_rx, } = link; - let PersistentData { authority_set, consensus_changes, set_state } = persistent_data; - let initial_state = (authority_set, consensus_changes, set_state.clone(), voter_commands_rx.into_future()); - - let (network, network_startup) = NetworkBridge::new(network, config.clone(), set_state, on_exit.clone()); - - let observer_work = future::loop_fn(initial_state, move |state| { - let (authority_set, consensus_changes, set_state, voter_commands_rx) = state; - let set_id = authority_set.set_id(); - let voters = Arc::new(authority_set.current_authorities()); - let client = client.clone(); - - // start global communication stream for the current set - let (global_in, _) = global_communication( - set_id, - &voters, - &client, - &network, - &config.keystore, - ); - - let last_finalized_number = client.info().chain.finalized_number; - - // NOTE: since we are not using `round_communication` we have to - // manually note the round with the gossip validator, otherwise we won't - // relay round messages. we want all full nodes to contribute to vote - // availability. - let note_round = { - let network = network.clone(); - let voters = voters.clone(); - - move |round| network.note_round( - crate::communication::Round(round), - crate::communication::SetId(set_id), - &*voters, - ) - }; - - // create observer for the current set - let observer = grandpa_observer( - &client, - &authority_set, - &consensus_changes, - &voters, - last_finalized_number, - global_in, - note_round, - ); - - let handle_voter_command = move |command, voter_commands_rx| { - // the observer doesn't use the voter set state, but we need to - // update it on-disk in case we restart as validator in the future. - let set_state = match command { - VoterCommand::Pause(reason) => { - info!(target: "afg", "Pausing old validator set: {}", reason); - - let completed_rounds = set_state.read().completed_rounds(); - let set_state = VoterSetState::Paused { completed_rounds }; - - #[allow(deprecated)] - crate::aux_schema::write_voter_set_state(&**client.backend(), &set_state)?; - - set_state - }, - VoterCommand::ChangeAuthorities(new) => { - // start the new authority set using the block where the - // set changed (not where the signal happened!) as the base. - let set_state = VoterSetState::live( - new.set_id, - &*authority_set.inner().read(), - (new.canon_hash, new.canon_number), - ); - - #[allow(deprecated)] - crate::aux_schema::write_voter_set_state(&**client.backend(), &set_state)?; - - set_state - }, - }; - - Ok(FutureLoop::Continue((authority_set, consensus_changes, set_state.into(), voter_commands_rx))) - }; - - // run observer and listen to commands (switch authorities or pause) - observer.select2(voter_commands_rx).then(move |res| match res { - Ok(future::Either::A((_, _))) => { - // observer commit stream doesn't conclude naturally; this could reasonably be an error. - Ok(FutureLoop::Break(())) - }, - Err(future::Either::B(_)) => { - // the `voter_commands_rx` stream should not fail. - Ok(FutureLoop::Break(())) - }, - Ok(future::Either::B(((None, _), _))) => { - // the `voter_commands_rx` stream should never conclude since it's never closed. - Ok(FutureLoop::Break(())) - }, - Err(future::Either::A((CommandOrError::Error(e), _))) => { - // return inner observer error - Err(e) - }, - Ok(future::Either::B(((Some(command), voter_commands_rx), _))) => { - // some command issued externally - handle_voter_command(command, voter_commands_rx.into_future()) - }, - Err(future::Either::A((CommandOrError::VoterCommand(command), voter_commands_rx))) => { - // some command issued internally - handle_voter_command(command, voter_commands_rx) - }, - }) - }); + let (network, network_startup) = NetworkBridge::new( + network, + config.clone(), + persistent_data.set_state.clone(), + on_exit.clone() + ); + let observer_work = ObserverWork::new( + client, + network, + persistent_data, + config.keystore.clone(), + voter_commands_rx + ); let observer_work = observer_work .map(|_| ()) @@ -292,3 +195,184 @@ pub fn run_grandpa_observer, N, RA, SC>( Ok(observer_work.select(on_exit).map(|_| ()).map_err(|_| ())) } + +/// Future that powers the observer. +#[must_use] +struct ObserverWork, N: Network, E, Backend, RA> { + observer: Box>> + Send>, + client: Arc>, + network: NetworkBridge, + persistent_data: PersistentData, + keystore: Option, + voter_commands_rx: mpsc::UnboundedReceiver>>, +} + +impl ObserverWork +where + B: BlockT, + N: Network, + N::In: Send + 'static, + NumberFor: BlockNumberOps, + RA: 'static + Send + Sync, + E: CallExecutor + Send + Sync + 'static, + Bk: Backend + 'static, +{ + fn new( + client: Arc>, + network: NetworkBridge, + persistent_data: PersistentData, + keystore: Option, + voter_commands_rx: mpsc::UnboundedReceiver>>, + ) -> Self { + + let mut work = ObserverWork { + // `observer` is set to a temporary value and replaced below when + // calling `rebuild_observer`. + observer: Box::new(futures::empty()) as Box<_>, + client, + network, + persistent_data, + keystore, + voter_commands_rx, + }; + work.rebuild_observer(); + work + } + + /// Rebuilds the `self.observer` field using the current authority set + /// state. This method should be called when we know that the authority set + /// has changed (e.g. as signalled by a voter command). + fn rebuild_observer(&mut self) { + let set_id = self.persistent_data.authority_set.set_id(); + let voters = Arc::new(self.persistent_data.authority_set.current_authorities()); + + // start global communication stream for the current set + let (global_in, _) = global_communication( + set_id, + &voters, + &self.client, + &self.network, + &self.keystore, + ); + + let last_finalized_number = self.client.info().chain.finalized_number; + + // NOTE: since we are not using `round_communication` we have to + // manually note the round with the gossip validator, otherwise we won't + // relay round messages. we want all full nodes to contribute to vote + // availability. + let note_round = { + let network = self.network.clone(); + let voters = voters.clone(); + + move |round| network.note_round( + crate::communication::Round(round), + crate::communication::SetId(set_id), + &*voters, + ) + }; + + // create observer for the current set + let observer = grandpa_observer( + &self.client, + &self.persistent_data.authority_set, + &self.persistent_data.consensus_changes, + &voters, + last_finalized_number, + global_in, + note_round, + ); + + self.observer = Box::new(observer); + } + + fn handle_voter_command( + &mut self, + command: VoterCommand>, + ) -> Result<(), Error> { + // the observer doesn't use the voter set state, but we need to + // update it on-disk in case we restart as validator in the future. + self.persistent_data.set_state = match command { + VoterCommand::Pause(reason) => { + info!(target: "afg", "Pausing old validator set: {}", reason); + + let completed_rounds = self.persistent_data.set_state.read().completed_rounds(); + let set_state = VoterSetState::Paused { completed_rounds }; + + #[allow(deprecated)] + crate::aux_schema::write_voter_set_state(&**self.client.backend(), &set_state)?; + + set_state + }, + VoterCommand::ChangeAuthorities(new) => { + // start the new authority set using the block where the + // set changed (not where the signal happened!) as the base. + let set_state = VoterSetState::live( + new.set_id, + &*self.persistent_data.authority_set.inner().read(), + (new.canon_hash, new.canon_number), + ); + + #[allow(deprecated)] + crate::aux_schema::write_voter_set_state(&**self.client.backend(), &set_state)?; + + set_state + }, + }.into(); + + self.rebuild_observer(); + Ok(()) + } +} + +impl Future for ObserverWork +where + B: BlockT, + N: Network, + N::In: Send + 'static, + NumberFor: BlockNumberOps, + RA: 'static + Send + Sync, + E: CallExecutor + Send + Sync + 'static, + Bk: Backend + 'static, +{ + type Item = (); + type Error = Error; + + fn poll(&mut self) -> Poll { + match self.observer.poll() { + Ok(Async::NotReady) => {} + Ok(Async::Ready(())) => { + // observer commit stream doesn't conclude naturally; this could reasonably be an error. + return Ok(Async::Ready(())) + } + Err(CommandOrError::Error(e)) => { + // return inner observer error + return Err(e) + } + Err(CommandOrError::VoterCommand(command)) => { + // some command issued internally + self.handle_voter_command(command)?; + futures::task::current().notify(); + } + } + + match self.voter_commands_rx.poll() { + Ok(Async::NotReady) => {} + Err(_) => { + // the `voter_commands_rx` stream should not fail. + return Ok(Async::Ready(())) + } + Ok(Async::Ready(None)) => { + // the `voter_commands_rx` stream should never conclude since it's never closed. + return Ok(Async::Ready(())) + } + Ok(Async::Ready(Some(command))) => { + // some command issued externally + self.handle_voter_command(command)?; + futures::task::current().notify(); + } + } + + Ok(Async::NotReady) + } +}