Update exit-future and make sc-cli compile on wasm (#4289)

* updated exit-future (github repo)

* Switch to broadcast crate

* Migrate client/cli

* Switch exit-future to modernize branch

* Small changes

* Switch to cargo version and fix fg tests

* Revert "Small changes"

This reverts commit a488106805d220cb4aee9e46a71481424c6d87d5.
This commit is contained in:
Ashley
2019-12-04 18:35:33 +01:00
committed by GitHub
parent c071276187
commit a58e8a6e45
11 changed files with 105 additions and 76 deletions
+8 -8
View File
@@ -28,7 +28,7 @@ use chain_spec::{RuntimeGenesis, Extension};
use consensus_common::import_queue::ImportQueue;
use futures::{prelude::*, sync::mpsc};
use futures03::{
compat::{Compat, Future01CompatExt},
compat::Compat,
FutureExt as _, TryFutureExt as _,
StreamExt as _, TryStreamExt as _,
future::{select, Either}
@@ -880,7 +880,7 @@ ServiceBuilder<
Ok(())
})
.select(exit.clone())
.select(exit.clone().map(Ok).compat())
.then(|_| Ok(()));
let _ = to_spawn_tx.unbounded_send(Box::new(events));
}
@@ -902,7 +902,7 @@ ServiceBuilder<
);
Ok(())
})
.select(exit.clone())
.select(exit.clone().map(Ok).compat())
.then(|_| Ok(()));
let _ = to_spawn_tx.unbounded_send(Box::new(events));
@@ -967,7 +967,7 @@ ServiceBuilder<
);
Ok(())
}).select(exit.clone()).then(|_| Ok(()));
}).select(exit.clone().map(Ok).compat()).then(|_| Ok(()));
let _ = to_spawn_tx.unbounded_send(Box::new(tel_task));
// Periodically send the network state to the telemetry.
@@ -980,7 +980,7 @@ ServiceBuilder<
"state" => network_state,
);
Ok(())
}).select(exit.clone()).then(|_| Ok(()));
}).select(exit.clone().map(Ok).compat()).then(|_| Ok(()));
let _ = to_spawn_tx.unbounded_send(Box::new(tel_task_2));
// RPC
@@ -1054,7 +1054,7 @@ ServiceBuilder<
dht_event_tx,
)
.map_err(|_| ())
.select(exit.clone())
.select(exit.clone().map(Ok).compat())
.then(|_| Ok(()))));
let telemetry_connection_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<()>>>> = Default::default();
@@ -1099,7 +1099,7 @@ ServiceBuilder<
Ok(())
});
let _ = to_spawn_tx.unbounded_send(Box::new(future
.select(exit.clone())
.select(exit.clone().map(Ok).compat())
.then(|_| Ok(()))));
telemetry
});
@@ -1108,7 +1108,7 @@ ServiceBuilder<
if let Some(port) = config.grafana_port {
let future = select(
grafana_data_source::run_server(port).boxed(),
exit.clone().compat()
exit.clone()
).map(|either| match either {
Either::Left((result, _)) => result.map_err(|_| ()),
Either::Right(_) => Ok(())
+7 -4
View File
@@ -120,7 +120,8 @@ impl Executor<Box<dyn Future<Item = (), Error = ()> + Send>> for SpawnTaskHandle
&self,
future: Box<dyn Future<Item = (), Error = ()> + Send>,
) -> Result<(), futures::future::ExecuteError<Box<dyn Future<Item = (), Error = ()> + Send>>> {
let future = Box::new(future.select(self.on_exit.clone()).then(|_| Ok(())));
let exit = self.on_exit.clone().map(Ok).compat();
let future = Box::new(future.select(exit).then(|_| Ok(())));
if let Err(err) = self.sender.unbounded_send(future) {
let kind = futures::future::ExecuteErrorKind::Shutdown;
Err(futures::future::ExecuteError::new(kind, err.into_inner()))
@@ -236,7 +237,8 @@ where
}
fn spawn_task(&self, task: impl Future<Item = (), Error = ()> + Send + 'static) {
let task = task.select(self.on_exit()).then(|_| Ok(()));
let exit = self.on_exit().map(Ok).compat();
let task = task.select(exit).then(|_| Ok(()));
let _ = self.to_spawn_tx.unbounded_send(Box::new(task));
}
@@ -249,7 +251,8 @@ where
let _ = essential_failed.send(());
Ok(())
});
let task = essential_task.select(self.on_exit()).then(|_| Ok(()));
let exit = self.on_exit().map(Ok::<_, ()>).compat();
let task = essential_task.select(exit).then(|_| Ok(()));
let _ = self.to_spawn_tx.unbounded_send(Box::new(task));
}
@@ -503,7 +506,7 @@ impl<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> Drop for
fn drop(&mut self) {
debug!(target: "service", "Substrate service shutdown");
if let Some(signal) = self.signal.take() {
signal.fire();
let _ = signal.fire();
}
}
}