Rewrite the GrandPa observer work future (#3309)

* Rewrite the observer work future

* Line widths

* Update core/finality-grandpa/src/observer.rs

Co-Authored-By: André Silva <andre.beat@gmail.com>
This commit is contained in:
Pierre Krieger
2019-08-12 12:49:51 +02:00
committed by Gavin Wood
parent 6fa84bae0f
commit 4f051a5784
+196 -112
View File
@@ -17,7 +17,7 @@
use std::sync::Arc;
use futures::prelude::*;
use futures::future::{self, Loop as FutureLoop};
use futures::{future, sync::mpsc};
use grandpa::{
BlockNumberOps, Error as GrandpaError, voter, voter_set::VoterSet
@@ -31,7 +31,7 @@ use primitives::{H256, Blake2Hasher};
use crate::{
global_communication, CommandOrError, CommunicationIn, Config, environment,
LinkHalf, Network, aux_schema::PersistentData, VoterCommand, VoterSetState,
LinkHalf, Network, Error, aux_schema::PersistentData, VoterCommand, VoterSetState,
};
use crate::authorities::SharedAuthoritySet;
use crate::communication::NetworkBridge;
@@ -171,116 +171,19 @@ pub fn run_grandpa_observer<B, E, Block: BlockT<Hash=H256>, N, RA, SC>(
voter_commands_rx,
} = link;
let PersistentData { authority_set, consensus_changes, set_state } = persistent_data;
let initial_state = (authority_set, consensus_changes, set_state.clone(), voter_commands_rx.into_future());
let (network, network_startup) = NetworkBridge::new(network, config.clone(), set_state, on_exit.clone());
let observer_work = future::loop_fn(initial_state, move |state| {
let (authority_set, consensus_changes, set_state, voter_commands_rx) = state;
let set_id = authority_set.set_id();
let voters = Arc::new(authority_set.current_authorities());
let client = client.clone();
// start global communication stream for the current set
let (global_in, _) = global_communication(
set_id,
&voters,
&client,
&network,
&config.keystore,
);
let last_finalized_number = client.info().chain.finalized_number;
// NOTE: since we are not using `round_communication` we have to
// manually note the round with the gossip validator, otherwise we won't
// relay round messages. we want all full nodes to contribute to vote
// availability.
let note_round = {
let network = network.clone();
let voters = voters.clone();
move |round| network.note_round(
crate::communication::Round(round),
crate::communication::SetId(set_id),
&*voters,
)
};
// create observer for the current set
let observer = grandpa_observer(
&client,
&authority_set,
&consensus_changes,
&voters,
last_finalized_number,
global_in,
note_round,
);
let handle_voter_command = move |command, voter_commands_rx| {
// the observer doesn't use the voter set state, but we need to
// update it on-disk in case we restart as validator in the future.
let set_state = match command {
VoterCommand::Pause(reason) => {
info!(target: "afg", "Pausing old validator set: {}", reason);
let completed_rounds = set_state.read().completed_rounds();
let set_state = VoterSetState::Paused { completed_rounds };
#[allow(deprecated)]
crate::aux_schema::write_voter_set_state(&**client.backend(), &set_state)?;
set_state
},
VoterCommand::ChangeAuthorities(new) => {
// start the new authority set using the block where the
// set changed (not where the signal happened!) as the base.
let set_state = VoterSetState::live(
new.set_id,
&*authority_set.inner().read(),
(new.canon_hash, new.canon_number),
);
#[allow(deprecated)]
crate::aux_schema::write_voter_set_state(&**client.backend(), &set_state)?;
set_state
},
};
Ok(FutureLoop::Continue((authority_set, consensus_changes, set_state.into(), voter_commands_rx)))
};
// run observer and listen to commands (switch authorities or pause)
observer.select2(voter_commands_rx).then(move |res| match res {
Ok(future::Either::A((_, _))) => {
// observer commit stream doesn't conclude naturally; this could reasonably be an error.
Ok(FutureLoop::Break(()))
},
Err(future::Either::B(_)) => {
// the `voter_commands_rx` stream should not fail.
Ok(FutureLoop::Break(()))
},
Ok(future::Either::B(((None, _), _))) => {
// the `voter_commands_rx` stream should never conclude since it's never closed.
Ok(FutureLoop::Break(()))
},
Err(future::Either::A((CommandOrError::Error(e), _))) => {
// return inner observer error
Err(e)
},
Ok(future::Either::B(((Some(command), voter_commands_rx), _))) => {
// some command issued externally
handle_voter_command(command, voter_commands_rx.into_future())
},
Err(future::Either::A((CommandOrError::VoterCommand(command), voter_commands_rx))) => {
// some command issued internally
handle_voter_command(command, voter_commands_rx)
},
})
});
let (network, network_startup) = NetworkBridge::new(
network,
config.clone(),
persistent_data.set_state.clone(),
on_exit.clone()
);
let observer_work = ObserverWork::new(
client,
network,
persistent_data,
config.keystore.clone(),
voter_commands_rx
);
let observer_work = observer_work
.map(|_| ())
@@ -292,3 +195,184 @@ pub fn run_grandpa_observer<B, E, Block: BlockT<Hash=H256>, N, RA, SC>(
Ok(observer_work.select(on_exit).map(|_| ()).map_err(|_| ()))
}
/// Future that powers the observer.
#[must_use]
struct ObserverWork<B: BlockT<Hash=H256>, N: Network<B>, E, Backend, RA> {
observer: Box<dyn Future<Item = (), Error = CommandOrError<B::Hash, NumberFor<B>>> + Send>,
client: Arc<Client<Backend, E, B, RA>>,
network: NetworkBridge<B, N>,
persistent_data: PersistentData<B>,
keystore: Option<keystore::KeyStorePtr>,
voter_commands_rx: mpsc::UnboundedReceiver<VoterCommand<B::Hash, NumberFor<B>>>,
}
impl<B, N, E, Bk, RA> ObserverWork<B, N, E, Bk, RA>
where
B: BlockT<Hash=H256>,
N: Network<B>,
N::In: Send + 'static,
NumberFor<B>: BlockNumberOps,
RA: 'static + Send + Sync,
E: CallExecutor<B, Blake2Hasher> + Send + Sync + 'static,
Bk: Backend<B, Blake2Hasher> + 'static,
{
fn new(
client: Arc<Client<Bk, E, B, RA>>,
network: NetworkBridge<B, N>,
persistent_data: PersistentData<B>,
keystore: Option<keystore::KeyStorePtr>,
voter_commands_rx: mpsc::UnboundedReceiver<VoterCommand<B::Hash, NumberFor<B>>>,
) -> Self {
let mut work = ObserverWork {
// `observer` is set to a temporary value and replaced below when
// calling `rebuild_observer`.
observer: Box::new(futures::empty()) as Box<_>,
client,
network,
persistent_data,
keystore,
voter_commands_rx,
};
work.rebuild_observer();
work
}
/// Rebuilds the `self.observer` field using the current authority set
/// state. This method should be called when we know that the authority set
/// has changed (e.g. as signalled by a voter command).
fn rebuild_observer(&mut self) {
let set_id = self.persistent_data.authority_set.set_id();
let voters = Arc::new(self.persistent_data.authority_set.current_authorities());
// start global communication stream for the current set
let (global_in, _) = global_communication(
set_id,
&voters,
&self.client,
&self.network,
&self.keystore,
);
let last_finalized_number = self.client.info().chain.finalized_number;
// NOTE: since we are not using `round_communication` we have to
// manually note the round with the gossip validator, otherwise we won't
// relay round messages. we want all full nodes to contribute to vote
// availability.
let note_round = {
let network = self.network.clone();
let voters = voters.clone();
move |round| network.note_round(
crate::communication::Round(round),
crate::communication::SetId(set_id),
&*voters,
)
};
// create observer for the current set
let observer = grandpa_observer(
&self.client,
&self.persistent_data.authority_set,
&self.persistent_data.consensus_changes,
&voters,
last_finalized_number,
global_in,
note_round,
);
self.observer = Box::new(observer);
}
fn handle_voter_command(
&mut self,
command: VoterCommand<B::Hash, NumberFor<B>>,
) -> Result<(), Error> {
// the observer doesn't use the voter set state, but we need to
// update it on-disk in case we restart as validator in the future.
self.persistent_data.set_state = match command {
VoterCommand::Pause(reason) => {
info!(target: "afg", "Pausing old validator set: {}", reason);
let completed_rounds = self.persistent_data.set_state.read().completed_rounds();
let set_state = VoterSetState::Paused { completed_rounds };
#[allow(deprecated)]
crate::aux_schema::write_voter_set_state(&**self.client.backend(), &set_state)?;
set_state
},
VoterCommand::ChangeAuthorities(new) => {
// start the new authority set using the block where the
// set changed (not where the signal happened!) as the base.
let set_state = VoterSetState::live(
new.set_id,
&*self.persistent_data.authority_set.inner().read(),
(new.canon_hash, new.canon_number),
);
#[allow(deprecated)]
crate::aux_schema::write_voter_set_state(&**self.client.backend(), &set_state)?;
set_state
},
}.into();
self.rebuild_observer();
Ok(())
}
}
impl<B, N, E, Bk, RA> Future for ObserverWork<B, N, E, Bk, RA>
where
B: BlockT<Hash=H256>,
N: Network<B>,
N::In: Send + 'static,
NumberFor<B>: BlockNumberOps,
RA: 'static + Send + Sync,
E: CallExecutor<B, Blake2Hasher> + Send + Sync + 'static,
Bk: Backend<B, Blake2Hasher> + 'static,
{
type Item = ();
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.observer.poll() {
Ok(Async::NotReady) => {}
Ok(Async::Ready(())) => {
// observer commit stream doesn't conclude naturally; this could reasonably be an error.
return Ok(Async::Ready(()))
}
Err(CommandOrError::Error(e)) => {
// return inner observer error
return Err(e)
}
Err(CommandOrError::VoterCommand(command)) => {
// some command issued internally
self.handle_voter_command(command)?;
futures::task::current().notify();
}
}
match self.voter_commands_rx.poll() {
Ok(Async::NotReady) => {}
Err(_) => {
// the `voter_commands_rx` stream should not fail.
return Ok(Async::Ready(()))
}
Ok(Async::Ready(None)) => {
// the `voter_commands_rx` stream should never conclude since it's never closed.
return Ok(Async::Ready(()))
}
Ok(Async::Ready(Some(command))) => {
// some command issued externally
self.handle_voter_command(command)?;
futures::task::current().notify();
}
}
Ok(Async::NotReady)
}
}