mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-09 20:11:09 +00:00
Remove usage of loop_fn in the GRANDPA tests (#3397)
This commit is contained in:
committed by
Bastian Köcher
parent
ea8831b15f
commit
7e2dba3e3a
@@ -1102,6 +1102,8 @@ fn voter_persists_its_votes() {
|
||||
let client = net.peer(0).client().clone();
|
||||
let net = Arc::new(Mutex::new(net));
|
||||
|
||||
// channel between the voter and the main controller.
|
||||
// sending a message on the `voter_tx` restarts the voter.
|
||||
let (voter_tx, voter_rx) = mpsc::unbounded::<()>();
|
||||
|
||||
let mut keystore_paths = Vec::new();
|
||||
@@ -1110,61 +1112,79 @@ fn voter_persists_its_votes() {
|
||||
// channel. whenever a message is received the voter is restarted. when the
|
||||
// sender is dropped the voter is stopped.
|
||||
{
|
||||
let net = net.clone();
|
||||
let client = client.clone();
|
||||
|
||||
let (keystore, keystore_path) = create_keystore(peers[0]);
|
||||
keystore_paths.push(keystore_path);
|
||||
|
||||
let voter = future::loop_fn(voter_rx, move |rx| {
|
||||
let (_block_import, _, _, _, link) = net.lock().make_block_import(client.clone());
|
||||
let link = link.lock().take().unwrap();
|
||||
struct ResettableVoter {
|
||||
voter: Box<dyn Future<Item = (), Error = ()> + Send>,
|
||||
voter_rx: mpsc::UnboundedReceiver<()>,
|
||||
net: Arc<Mutex<GrandpaTestNet>>,
|
||||
client: PeersClient,
|
||||
keystore: KeyStorePtr,
|
||||
}
|
||||
|
||||
let grandpa_params = GrandpaParams {
|
||||
config: Config {
|
||||
gossip_duration: TEST_GOSSIP_DURATION,
|
||||
justification_period: 32,
|
||||
keystore: Some(keystore.clone()),
|
||||
name: Some(format!("peer#{}", 0)),
|
||||
},
|
||||
link,
|
||||
network: net.lock().peers[0].network_service().clone(),
|
||||
inherent_data_providers: InherentDataProviders::new(),
|
||||
on_exit: Exit,
|
||||
telemetry_on_connect: None,
|
||||
};
|
||||
impl Future for ResettableVoter {
|
||||
type Item = ();
|
||||
type Error = ();
|
||||
|
||||
let voter = run_grandpa_voter(grandpa_params)
|
||||
.expect("all in order with client and network")
|
||||
.then(move |r| {
|
||||
// we need to keep the block_import alive since it owns the
|
||||
// sender for the voter commands channel, if that gets dropped
|
||||
// then the voter will stop
|
||||
drop(_block_import);
|
||||
r
|
||||
});
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
match self.voter.poll() {
|
||||
Ok(Async::Ready(())) | Err(_) => panic!("error in the voter"),
|
||||
Ok(Async::NotReady) => {},
|
||||
}
|
||||
|
||||
voter.select2(rx.into_future()).then(|res| match res {
|
||||
Ok(future::Either::A(x)) => {
|
||||
panic!("voter stopped unexpectedly: {:?}", x);
|
||||
},
|
||||
Ok(future::Either::B(((Some(()), rx), _))) => {
|
||||
Ok(future::Loop::Continue(rx))
|
||||
},
|
||||
Ok(future::Either::B(((None, _), _))) => {
|
||||
Ok(future::Loop::Break(()))
|
||||
},
|
||||
Err(future::Either::A(err)) => {
|
||||
panic!("unexpected error: {:?}", err);
|
||||
},
|
||||
Err(future::Either::B(..)) => {
|
||||
// voter_rx dropped, stop the voter.
|
||||
Ok(future::Loop::Break(()))
|
||||
},
|
||||
})
|
||||
match self.voter_rx.poll() {
|
||||
Err(_) | Ok(Async::Ready(None)) => return Ok(Async::Ready(())),
|
||||
Ok(Async::NotReady) => {}
|
||||
Ok(Async::Ready(Some(()))) => {
|
||||
let (_block_import, _, _, _, link) =
|
||||
self.net.lock().make_block_import(self.client.clone());
|
||||
let link = link.lock().take().unwrap();
|
||||
|
||||
let grandpa_params = GrandpaParams {
|
||||
config: Config {
|
||||
gossip_duration: TEST_GOSSIP_DURATION,
|
||||
justification_period: 32,
|
||||
keystore: Some(self.keystore.clone()),
|
||||
name: Some(format!("peer#{}", 0)),
|
||||
},
|
||||
link,
|
||||
network: self.net.lock().peers[0].network_service().clone(),
|
||||
inherent_data_providers: InherentDataProviders::new(),
|
||||
on_exit: Exit,
|
||||
telemetry_on_connect: None,
|
||||
};
|
||||
|
||||
let voter = run_grandpa_voter(grandpa_params)
|
||||
.expect("all in order with client and network")
|
||||
.then(move |r| {
|
||||
// we need to keep the block_import alive since it owns the
|
||||
// sender for the voter commands channel, if that gets dropped
|
||||
// then the voter will stop
|
||||
drop(_block_import);
|
||||
r
|
||||
});
|
||||
|
||||
self.voter = Box::new(voter);
|
||||
// notify current task in order to poll the voter
|
||||
futures::task::current().notify();
|
||||
}
|
||||
};
|
||||
|
||||
Ok(Async::NotReady)
|
||||
}
|
||||
}
|
||||
|
||||
// we create a "dummy" voter by setting it to `empty` and triggering the `tx`.
|
||||
// this way, the `ResettableVoter` will reset its `voter` field to a value ASAP.
|
||||
voter_tx.unbounded_send(()).unwrap();
|
||||
runtime.spawn(ResettableVoter {
|
||||
voter: Box::new(futures::future::empty()),
|
||||
voter_rx,
|
||||
net: net.clone(),
|
||||
client: client.clone(),
|
||||
keystore,
|
||||
});
|
||||
|
||||
runtime.spawn(voter);
|
||||
}
|
||||
|
||||
let (exit_tx, exit_rx) = futures::sync::oneshot::channel::<()>();
|
||||
|
||||
Reference in New Issue
Block a user