node: fix shutdown (#1308)

* node: remove grandpa authority flags

* node: exit-guard grandpa and aura spawned futures

* node: wait for futures to stop running on shutdown

* core: run connectivity tests on same ports

* core: pass on_exit future when starting aura and grandpa

* node: add issue number to todo

* core: fix aura and grandpa tests
This commit is contained in:
André Silva
2018-12-21 16:30:45 +00:00
committed by Robert Habermeier
parent ef8b94656e
commit f8f932d123
7 changed files with 54 additions and 28 deletions
+8 -5
View File
@@ -179,16 +179,15 @@ pub fn start_aura_thread<B, C, E, I, SO, Error>(
}
};
runtime.spawn(start_aura(
let _ = runtime.block_on(start_aura(
slot_duration,
local_key,
client,
block_import,
env,
sync_oracle,
on_exit,
));
runtime.block_on(on_exit).expect("Exit future should not fail");
});
}
@@ -200,6 +199,7 @@ pub fn start_aura<B, C, E, I, SO, Error>(
block_import: Arc<I>,
env: Arc<E>,
sync_oracle: SO,
on_exit: impl Future<Item=(),Error=()>,
) -> impl Future<Item=(),Error=()> where
B: Block,
C: Authorities<B> + ChainHead<B>,
@@ -352,7 +352,7 @@ pub fn start_aura<B, C, E, I, SO, Error>(
})
};
future::loop_fn((), move |()| {
let work = future::loop_fn((), move |()| {
let authorship_task = ::std::panic::AssertUnwindSafe(make_authorship());
authorship_task.catch_unwind().then(|res| {
match res {
@@ -369,7 +369,9 @@ pub fn start_aura<B, C, E, I, SO, Error>(
Ok(future::Loop::Continue(()))
})
})
});
work.select(on_exit).then(|_| Ok(()))
}
// a header which has been checked
@@ -760,6 +762,7 @@ mod tests {
client,
environ.clone(),
DummyOracle,
futures::empty(),
);
runtime.spawn(aura);
+2 -1
View File
@@ -1186,6 +1186,7 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
config: Config,
link: LinkHalf<B, E, Block, RA>,
network: N,
on_exit: impl Future<Item=(),Error=()> + Send + 'static,
) -> ::client::error::Result<impl Future<Item=(),Error=()> + Send + 'static> where
Block::Hash: Ord,
B: Backend<Block, Blake2Hasher> + 'static,
@@ -1312,5 +1313,5 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
}))
}).map_err(|e| warn!("GRANDPA Voter failed: {:?}", e));
Ok(voter_work)
Ok(voter_work.select(on_exit).then(|_| Ok(())))
}
@@ -376,6 +376,7 @@ fn finalize_3_voters_no_observers() {
},
link,
MessageRouting::new(net.clone(), peer_id),
futures::empty(),
).expect("all in order with client and network");
assert_send(&voter);
@@ -436,6 +437,7 @@ fn finalize_3_voters_1_observer() {
},
link,
MessageRouting::new(net.clone(), peer_id),
futures::empty(),
).expect("all in order with client and network");
runtime.spawn(voter);
@@ -592,6 +594,7 @@ fn transition_3_voters_twice_1_observer() {
},
link,
MessageRouting::new(net.clone(), peer_id),
futures::empty(),
).expect("all in order with client and network");
runtime.spawn(voter);
+7 -3
View File
@@ -107,7 +107,7 @@ pub struct Service<Components: components::Components> {
/// Configuration of this Service
pub config: FactoryFullConfiguration<Components::Factory>,
_rpc: Box<::std::any::Any + Send + Sync>,
_telemetry: Option<tel::Telemetry>,
_telemetry: Option<Arc<tel::Telemetry>>,
}
/// Creates bare client without any networking.
@@ -263,7 +263,7 @@ impl<Components: components::Components> Service<Components> {
let impl_name = config.impl_name.to_owned();
let version = version.clone();
let chain_name = config.chain_spec.name().to_owned();
Some(tel::init_telemetry(tel::TelemetryConfig {
Some(Arc::new(tel::init_telemetry(tel::TelemetryConfig {
url: url,
on_connect: Box::new(move || {
telemetry!("system.connected";
@@ -276,7 +276,7 @@ impl<Components: components::Components> Service<Components> {
"authority" => is_authority
);
}),
}))
})))
},
None => None,
};
@@ -306,6 +306,10 @@ impl<Components: components::Components> Service<Components> {
None
}
}
pub fn telemetry(&self) -> Option<Arc<tel::Telemetry>> {
self._telemetry.as_ref().map(|t| t.clone())
}
}
impl<Components> Service<Components> where Components: components::Components {
+8 -4
View File
@@ -33,7 +33,7 @@ use std::iter;
use std::sync::Arc;
use std::net::Ipv4Addr;
use std::time::Duration;
use futures::Stream;
use futures::{Future, Stream};
use tempdir::TempDir;
use tokio::runtime::Runtime;
use tokio::timer::Interval;
@@ -188,7 +188,7 @@ pub fn connectivity<F: ServiceFactory, Inherent>(spec: FactoryChainSpec<F>) wher
const NUM_NODES: u32 = 10;
{
let temp = TempDir::new("substrate-connectivity-test").expect("Error creating test dir");
{
let runtime = {
let mut network = TestNet::<F>::new(&temp, spec.clone(), NUM_NODES, 0, vec![], 30400);
info!("Checking star topology");
let first_address = network.full_nodes[0].1.network().node_id().expect("No node address");
@@ -198,13 +198,17 @@ pub fn connectivity<F: ServiceFactory, Inherent>(spec: FactoryChainSpec<F>) wher
network.run_until_all_full(|_index, service|
service.network().status().num_peers == NUM_NODES as usize - 1
);
}
network.runtime
};
runtime.shutdown_on_idle().wait().expect("Error shutting down runtime");
temp.close().expect("Error removing temp dir");
}
{
let temp = TempDir::new("substrate-connectivity-test").expect("Error creating test dir");
{
let mut network = TestNet::<F>::new(&temp, spec, NUM_NODES, 0, vec![], 30500);
let mut network = TestNet::<F>::new(&temp, spec, NUM_NODES, 0, vec![], 30400);
info!("Checking linked topology");
let mut address = network.full_nodes[0].1.network().node_id().expect("No node address");
for (_, service) in network.full_nodes.iter().skip(1) {
+13 -3
View File
@@ -50,6 +50,7 @@ pub mod chain_spec;
mod service;
mod params;
use tokio::prelude::Future;
use tokio::runtime::Runtime;
pub use cli::{VersionInfo, IntoExit};
use substrate_service::{ServiceFactory, Roles as ServiceRoles};
@@ -136,8 +137,8 @@ pub fn run<I, T, E>(args: I, exit: E, version: cli::VersionInfo) -> error::Resul
let mut runtime = Runtime::new()?;
let executor = runtime.executor();
match config.roles == ServiceRoles::LIGHT {
true => run_until_exit(&mut runtime, service::Factory::new_light(config, executor)?, exit)?,
false => run_until_exit(&mut runtime, service::Factory::new_full(config, executor)?, exit)?,
true => run_until_exit(runtime, service::Factory::new_light(config, executor)?, exit)?,
false => run_until_exit(runtime, service::Factory::new_full(config, executor)?, exit)?,
}
}
}
@@ -145,7 +146,7 @@ pub fn run<I, T, E>(args: I, exit: E, version: cli::VersionInfo) -> error::Resul
}
fn run_until_exit<T, C, E>(
runtime: &mut Runtime,
mut runtime: Runtime,
service: T,
e: E,
) -> error::Result<()>
@@ -161,5 +162,14 @@ fn run_until_exit<T, C, E>(
let _ = runtime.block_on(e.into_exit());
exit_send.fire();
// we eagerly drop the service so that the internal exit future is fired,
// but we need to keep holding a reference to the global telemetry guard
let _telemetry = service.telemetry();
drop(service);
// TODO [andre]: timeout this future #1318
let _ = runtime.shutdown_on_idle().wait();
Ok(())
}
+13 -12
View File
@@ -19,19 +19,20 @@
//! Service and ServiceFactory implementation. Specialized wrapper over substrate service.
use std::sync::Arc;
use transaction_pool::{self, txpool::{Pool as TransactionPool}};
use node_runtime::{GenesisConfig, RuntimeApi};
use std::time::Duration;
use client;
use consensus::{import_queue, start_aura, AuraImportQueue, SlotDuration, NothingExtra};
use grandpa;
use node_executor;
use primitives::ed25519::Pair;
use node_primitives::{Block, InherentData};
use node_runtime::{GenesisConfig, RuntimeApi};
use substrate_service::{
FactoryFullConfiguration, LightComponents, FullComponents, FullBackend,
FullClient, LightClient, LightBackend, FullExecutor, LightExecutor, TaskExecutor
};
use node_executor;
use consensus::{import_queue, start_aura, AuraImportQueue, SlotDuration, NothingExtra};
use primitives::ed25519::Pair;
use client;
use std::time::Duration;
use grandpa;
use transaction_pool::{self, txpool::{Pool as TransactionPool}};
construct_simple_protocol! {
/// Demo protocol attachment for substrate.
@@ -89,12 +90,13 @@ construct_service_factory! {
block_import.clone(),
proposer,
service.network(),
service.on_exit(),
));
info!("Running Grandpa session as Authority {}", key.public());
}
let voter = grandpa::run_grandpa(
executor.spawn(grandpa::run_grandpa(
grandpa::Config {
local_key,
gossip_duration: Duration::new(4, 0), // FIXME: make this available through chainspec?
@@ -102,9 +104,8 @@ construct_service_factory! {
},
link_half,
grandpa::NetworkBridge::new(service.network()),
)?;
executor.spawn(voter);
service.on_exit(),
)?);
Ok(service)
}