Remove Mutex from core-service (#2961)

* Remove `Mutex` from `core-service`

* Fixes sync test
This commit is contained in:
Bastian Köcher
2019-06-27 10:07:34 +02:00
committed by GitHub
parent 3f142d0561
commit 23ea5d1795
4 changed files with 179 additions and 135 deletions
+8 -25
View File
@@ -85,11 +85,11 @@ pub struct Service<Components: components::Components> {
/// Sender for futures that must be spawned as background tasks.
to_spawn_tx: mpsc::UnboundedSender<Box<dyn Future<Item = (), Error = ()> + Send>>,
/// Receiver for futures that must be spawned as background tasks.
to_spawn_rx: Mutex<mpsc::UnboundedReceiver<Box<dyn Future<Item = (), Error = ()> + Send>>>,
to_spawn_rx: mpsc::UnboundedReceiver<Box<dyn Future<Item = (), Error = ()> + Send>>,
/// List of futures to poll from `poll`.
/// If spawning a background task is not possible, we instead push the task into this `Vec`.
/// The elements must then be polled manually.
to_poll: Mutex<Vec<Box<dyn Future<Item = (), Error = ()> + Send>>>,
to_poll: Vec<Box<dyn Future<Item = (), Error = ()> + Send>>,
/// Configuration of this Service
pub config: FactoryFullConfiguration<Components::Factory>,
_rpc: Box<dyn std::any::Any + Send + Sync>,
@@ -478,8 +478,8 @@ impl<Components: components::Components> Service<Components> {
transaction_pool,
signal: Some(signal),
to_spawn_tx,
to_spawn_rx: Mutex::new(to_spawn_rx),
to_poll: Mutex::new(Vec::new()),
to_spawn_rx,
to_poll: Vec::new(),
keystore,
config,
exit,
@@ -556,24 +556,7 @@ impl<Components> Future for Service<Components> where Components: components::Co
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
Future::poll(&mut &*self)
}
}
// Note that this implementation is totally unnecessary. It exists only because of tests. The tests
// should eventually be reworked, as it would make it possible to remove the `Mutex`es. that we
// lock here.
impl<'a, Components> Future for &'a Service<Components> where Components: components::Components {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// The user is supposed to poll only one service, so it doesn't matter if we keep this
// mutex locked.
let mut to_poll = self.to_poll.lock();
let mut to_spawn_rx = self.to_spawn_rx.lock();
while let Ok(Async::Ready(Some(task_to_spawn))) = to_spawn_rx.poll() {
while let Ok(Async::Ready(Some(task_to_spawn))) = self.to_spawn_rx.poll() {
let executor = tokio_executor::DefaultExecutor::current();
if let Err(err) = executor.execute(task_to_spawn) {
debug!(
@@ -581,13 +564,13 @@ impl<'a, Components> Future for &'a Service<Components> where Components: compon
"Failed to spawn background task: {:?}; falling back to manual polling",
err
);
to_poll.push(err.into_future());
self.to_poll.push(err.into_future());
}
}
// Polling all the `to_poll` futures.
while let Some(pos) = to_poll.iter_mut().position(|t| t.poll().map(|t| t.is_ready()).unwrap_or(true)) {
to_poll.remove(pos);
while let Some(pos) = self.to_poll.iter_mut().position(|t| t.poll().map(|t| t.is_ready()).unwrap_or(true)) {
self.to_poll.remove(pos);
}
// The service future never ends.