diff --git a/substrate/core/service/src/components.rs b/substrate/core/service/src/components.rs index 600841f110..e34da42bcf 100644 --- a/substrate/core/service/src/components.rs +++ b/substrate/core/service/src/components.rs @@ -281,7 +281,6 @@ impl OffchainWorker for C where pub trait ServiceTrait: Deref> + Send - + Sync + 'static + StartRPC + MaintainTransactionPool @@ -290,7 +289,6 @@ pub trait ServiceTrait: impl ServiceTrait for T where T: Deref> + Send - + Sync + 'static + StartRPC + MaintainTransactionPool diff --git a/substrate/core/service/src/lib.rs b/substrate/core/service/src/lib.rs index 7467f00f41..b4557343dc 100644 --- a/substrate/core/service/src/lib.rs +++ b/substrate/core/service/src/lib.rs @@ -85,11 +85,11 @@ pub struct Service { /// Sender for futures that must be spawned as background tasks. to_spawn_tx: mpsc::UnboundedSender + Send>>, /// Receiver for futures that must be spawned as background tasks. - to_spawn_rx: Mutex + Send>>>, + to_spawn_rx: mpsc::UnboundedReceiver + Send>>, /// List of futures to poll from `poll`. /// If spawning a background task is not possible, we instead push the task into this `Vec`. /// The elements must then be polled manually. - to_poll: Mutex + Send>>>, + to_poll: Vec + Send>>, /// Configuration of this Service pub config: FactoryFullConfiguration, _rpc: Box, @@ -478,8 +478,8 @@ impl Service { transaction_pool, signal: Some(signal), to_spawn_tx, - to_spawn_rx: Mutex::new(to_spawn_rx), - to_poll: Mutex::new(Vec::new()), + to_spawn_rx, + to_poll: Vec::new(), keystore, config, exit, @@ -556,24 +556,7 @@ impl Future for Service where Components: components::Co type Error = (); fn poll(&mut self) -> Poll { - Future::poll(&mut &*self) - } -} - -// Note that this implementation is totally unnecessary. It exists only because of tests. The tests -// should eventually be reworked, as it would make it possible to remove the `Mutex`es. that we -// lock here. -impl<'a, Components> Future for &'a Service where Components: components::Components { - type Item = (); - type Error = (); - - fn poll(&mut self) -> Poll { - // The user is supposed to poll only one service, so it doesn't matter if we keep this - // mutex locked. - let mut to_poll = self.to_poll.lock(); - let mut to_spawn_rx = self.to_spawn_rx.lock(); - - while let Ok(Async::Ready(Some(task_to_spawn))) = to_spawn_rx.poll() { + while let Ok(Async::Ready(Some(task_to_spawn))) = self.to_spawn_rx.poll() { let executor = tokio_executor::DefaultExecutor::current(); if let Err(err) = executor.execute(task_to_spawn) { debug!( @@ -581,13 +564,13 @@ impl<'a, Components> Future for &'a Service where Components: compon "Failed to spawn background task: {:?}; falling back to manual polling", err ); - to_poll.push(err.into_future()); + self.to_poll.push(err.into_future()); } } // Polling all the `to_poll` futures. - while let Some(pos) = to_poll.iter_mut().position(|t| t.poll().map(|t| t.is_ready()).unwrap_or(true)) { - to_poll.remove(pos); + while let Some(pos) = self.to_poll.iter_mut().position(|t| t.poll().map(|t| t.is_ready()).unwrap_or(true)) { + self.to_poll.remove(pos); } // The service future never ends. diff --git a/substrate/core/service/test/src/lib.rs b/substrate/core/service/test/src/lib.rs index a871b1cb0d..5abd97fada 100644 --- a/substrate/core/service/test/src/lib.rs +++ b/substrate/core/service/test/src/lib.rs @@ -17,7 +17,7 @@ //! Service integration test utils. use std::iter; -use std::sync::Arc; +use std::sync::{Arc, Mutex, MutexGuard}; use std::net::Ipv4Addr; use std::time::Duration; use std::collections::HashMap; @@ -27,9 +27,7 @@ use tempdir::TempDir; use tokio::{runtime::Runtime, prelude::FutureExt}; use tokio::timer::Interval; use service::{ - Service, ServiceFactory, - Components, Configuration, FactoryFullConfiguration, FactoryChainSpec, @@ -46,22 +44,41 @@ const MAX_WAIT_TIME: Duration = Duration::from_secs(60 * 3); struct TestNet { runtime: Runtime, - authority_nodes: Vec<(u32, Arc, Multiaddr)>, - full_nodes: Vec<(u32, Arc, Multiaddr)>, - light_nodes: Vec<(u32, Arc, Multiaddr)>, + authority_nodes: Vec<(usize, SyncService, Multiaddr)>, + full_nodes: Vec<(usize, SyncService, Multiaddr)>, + light_nodes: Vec<(usize, SyncService, Multiaddr)>, chain_spec: FactoryChainSpec, base_port: u16, nodes: usize, } /// Wraps around an `Arc>` and implements `Future`. -struct ArcService(Arc); -impl Future for ArcService where T: std::ops::Deref> { +pub struct SyncService(Arc>); + +impl SyncService { + pub fn get(&self) -> MutexGuard { + self.0.lock().unwrap() + } +} + +impl Clone for SyncService { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +impl From for SyncService { + fn from(service: T) -> Self { + SyncService(Arc::new(Mutex::new(service))) + } +} + +impl> Future for SyncService { type Item = (); type Error = (); fn poll(&mut self) -> Poll { - Future::poll(&mut &**self.0) + self.0.lock().unwrap().poll() } } @@ -72,25 +89,31 @@ impl TestNet { light_predicate: LP, ) where - FP: Send + Sync + Fn(u32, &F::FullService) -> bool + 'static, - LP: Send + Sync + Fn(u32, &F::LightService) -> bool + 'static, + FP: Send + Fn(usize, &SyncService) -> bool + 'static, + LP: Send + Fn(usize, &SyncService) -> bool + 'static, { let full_nodes = self.full_nodes.clone(); let light_nodes = self.light_nodes.clone(); let interval = Interval::new_interval(Duration::from_millis(100)) .map_err(|_| ()) .for_each(move |_| { - let full_ready = full_nodes.iter().all(|&(ref id, ref service, _)| full_predicate(*id, service)); + let full_ready = full_nodes.iter().all(|&(ref id, ref service, _)| + full_predicate(*id, service) + ); + if !full_ready { return Ok(()); } - let light_ready = light_nodes.iter().all(|&(ref id, ref service, _)| light_predicate(*id, service)); - if !light_ready { - return Ok(()); - } + let light_ready = light_nodes.iter().all(|&(ref id, ref service, _)| + light_predicate(*id, service) + ); - Err(()) + if !light_ready { + Ok(()) + } else { + Err(()) + } }) .timeout(MAX_WAIT_TIME); @@ -103,7 +126,7 @@ impl TestNet { } fn node_config ( - index: u32, + index: usize, spec: &FactoryChainSpec, role: Roles, key_seed: Option, @@ -173,17 +196,27 @@ fn node_config ( } } -impl TestNet { - fn new(temp: &TempDir, spec: FactoryChainSpec, full: u32, light: u32, authorities: Vec, base_port: u16) -> TestNet { - let _ = ::env_logger::try_init(); - ::fdlimit::raise_fd_limit(); +impl TestNet where + F::FullService: Future, + F::LightService: Future +{ + fn new( + temp: &TempDir, + spec: FactoryChainSpec, + full: usize, + light: usize, + authorities: Vec, + base_port: u16 + ) -> TestNet { + let _ = env_logger::try_init(); + fdlimit::raise_fd_limit(); let runtime = Runtime::new().expect("Error creating tokio runtime"); let mut net = TestNet { runtime, authority_nodes: Default::default(), full_nodes: Default::default(), light_nodes: Default::default(), - chain_spec: spec.clone(), + chain_spec: spec, base_port, nodes: 0, }; @@ -191,51 +224,61 @@ impl TestNet { net } - fn insert_nodes(&mut self, temp: &TempDir, full: u32, light: u32, authorities: Vec) { + fn insert_nodes(&mut self, temp: &TempDir, full: usize, light: usize, authorities: Vec) { let mut nodes = self.nodes; let base_port = self.base_port; - let spec = self.chain_spec.clone(); + let spec = &self.chain_spec; let executor = self.runtime.executor(); self.authority_nodes.extend(authorities.iter().enumerate().map(|(index, key)| { - let node_config = node_config::(index as u32, &spec, Roles::AUTHORITY, Some(key.clone()), base_port, &temp); + let node_config = node_config::( + index, + &spec, + Roles::AUTHORITY, + Some(key.clone()), + base_port, + &temp, + ); let addr = node_config.network.listen_addresses.iter().next().unwrap().clone(); - let service = Arc::new(F::new_full(node_config) - .expect("Error creating test node service")); - executor.spawn(ArcService(service.clone())); - let addr = addr.with(multiaddr::Protocol::P2p(service.network().local_peer_id().into())); - ((index + nodes) as u32, service, addr) + let service = SyncService::from(F::new_full(node_config).expect("Error creating test node service")); + + executor.spawn(service.clone()); + let addr = addr.with(multiaddr::Protocol::P2p(service.get().network().local_peer_id().into())); + ((index + nodes), service, addr) })); nodes += authorities.len(); - self.full_nodes.extend((nodes..nodes + full as usize).map(|index| { - let node_config = node_config::(index as u32, &spec, Roles::FULL, None, base_port, &temp); + self.full_nodes.extend((nodes..nodes + full).map(|index| { + let node_config = node_config::(index, &spec, Roles::FULL, None, base_port, &temp); let addr = node_config.network.listen_addresses.iter().next().unwrap().clone(); - let service = Arc::new(F::new_full(node_config) - .expect("Error creating test node service")); - executor.spawn(ArcService(service.clone())); - let addr = addr.with(multiaddr::Protocol::P2p(service.network().local_peer_id().into())); - (index as u32, service, addr) - })); - nodes += full as usize; + let service = SyncService::from(F::new_full(node_config).expect("Error creating test node service")); - self.light_nodes.extend((nodes..nodes + light as usize).map(|index| { - let node_config = node_config::(index as u32, &spec, Roles::LIGHT, None, base_port, &temp); - let addr = node_config.network.listen_addresses.iter().next().unwrap().clone(); - let service = Arc::new(F::new_light(node_config) - .expect("Error creating test node service")); - executor.spawn(ArcService(service.clone())); - let addr = addr.with(multiaddr::Protocol::P2p(service.network().local_peer_id().into())); - (index as u32, service, addr) + executor.spawn(service.clone()); + let addr = addr.with(multiaddr::Protocol::P2p(service.get().network().local_peer_id().into())); + (index, service, addr) })); - nodes += light as usize; + nodes += full; + + self.light_nodes.extend((nodes..nodes + light).map(|index| { + let node_config = node_config::(index, &spec, Roles::LIGHT, None, base_port, &temp); + let addr = node_config.network.listen_addresses.iter().next().unwrap().clone(); + let service = SyncService::from(F::new_light(node_config).expect("Error creating test node service")); + + executor.spawn(service.clone()); + let addr = addr.with(multiaddr::Protocol::P2p(service.get().network().local_peer_id().into())); + (index, service, addr) + })); + nodes += light; self.nodes = nodes; } } -pub fn connectivity(spec: FactoryChainSpec) { - const NUM_FULL_NODES: u32 = 5; - const NUM_LIGHT_NODES: u32 = 5; +pub fn connectivity(spec: FactoryChainSpec) where + F::FullService: Future, + F::LightService: Future, +{ + const NUM_FULL_NODES: usize = 5; + const NUM_LIGHT_NODES: usize = 5; { let temp = TempDir::new("substrate-connectivity-test").expect("Error creating test dir"); let runtime = { @@ -250,15 +293,15 @@ pub fn connectivity(spec: FactoryChainSpec) { info!("Checking star topology"); let first_address = network.full_nodes[0].2.clone(); for (_, service, _) in network.full_nodes.iter().skip(1) { - service.network().add_reserved_peer(first_address.to_string()).expect("Error adding reserved peer"); + service.get().network().add_reserved_peer(first_address.to_string()).expect("Error adding reserved peer"); } for (_, service, _) in network.light_nodes.iter() { - service.network().add_reserved_peer(first_address.to_string()).expect("Error adding reserved peer"); + service.get().network().add_reserved_peer(first_address.to_string()).expect("Error adding reserved peer"); } network.run_until_all_full( - |_index, service| service.network().peers_debug_info().len() == NUM_FULL_NODES as usize - 1 - + NUM_LIGHT_NODES as usize, - |_index, service| service.network().peers_debug_info().len() == NUM_FULL_NODES as usize, + |_index, service| service.get().network().peers_debug_info().len() == NUM_FULL_NODES - 1 + + NUM_LIGHT_NODES, + |_index, service| service.get().network().peers_debug_info().len() == NUM_FULL_NODES, ); network.runtime }; @@ -280,43 +323,40 @@ pub fn connectivity(spec: FactoryChainSpec) { ); info!("Checking linked topology"); let mut address = network.full_nodes[0].2.clone(); - let max_nodes = ::std::cmp::max(NUM_FULL_NODES, NUM_LIGHT_NODES); + let max_nodes = std::cmp::max(NUM_FULL_NODES, NUM_LIGHT_NODES); for i in 0..max_nodes { if i != 0 { - if let Some((_, service, node_id)) = network.full_nodes.get(i as usize) { - service.network().add_reserved_peer(address.to_string()).expect("Error adding reserved peer"); + if let Some((_, service, node_id)) = network.full_nodes.get(i) { + service.get().network().add_reserved_peer(address.to_string()).expect("Error adding reserved peer"); address = node_id.clone(); } } - if let Some((_, service, node_id)) = network.light_nodes.get(i as usize) { - service.network().add_reserved_peer(address.to_string()).expect("Error adding reserved peer"); + if let Some((_, service, node_id)) = network.light_nodes.get(i) { + service.get().network().add_reserved_peer(address.to_string()).expect("Error adding reserved peer"); address = node_id.clone(); } } network.run_until_all_full( - |_index, service| service.network().peers_debug_info().len() == NUM_FULL_NODES as usize - 1 - + NUM_LIGHT_NODES as usize, - |_index, service| service.network().peers_debug_info().len() == NUM_FULL_NODES as usize, + |_index, service| service.get().network().peers_debug_info().len() == NUM_FULL_NODES - 1 + + NUM_LIGHT_NODES, + |_index, service| service.get().network().peers_debug_info().len() == NUM_FULL_NODES, ); } temp.close().expect("Error removing temp dir"); } } -pub fn sync( - spec: FactoryChainSpec, - mut block_factory: B, - mut extrinsic_factory: E, -) -where +pub fn sync(spec: FactoryChainSpec, mut block_factory: B, mut extrinsic_factory: E) where F: ServiceFactory, - B: FnMut(&F::FullService) -> ImportBlock, - E: FnMut(&F::FullService) -> FactoryExtrinsic, + F::FullService: Future, + F::LightService: Future, + B: FnMut(&SyncService) -> ImportBlock, + E: FnMut(&SyncService) -> FactoryExtrinsic, { - const NUM_FULL_NODES: u32 = 10; - const NUM_LIGHT_NODES: u32 = 10; - const NUM_BLOCKS: u32 = 512; + const NUM_FULL_NODES: usize = 10; + const NUM_LIGHT_NODES: usize = 10; + const NUM_BLOCKS: usize = 512; let temp = TempDir::new("substrate-sync-test").expect("Error creating test dir"); let mut network = TestNet::::new( &temp, @@ -329,45 +369,50 @@ where info!("Checking block sync"); let first_address = { let first_service = &network.full_nodes[0].1; + let client = first_service.get().client(); for i in 0 .. NUM_BLOCKS { if i % 128 == 0 { info!("Generating #{}", i); } let import_data = block_factory(&first_service); - first_service.client().import_block(import_data, HashMap::new()).expect("Error importing test block"); + client.import_block(import_data, HashMap::new()).expect("Error importing test block"); } network.full_nodes[0].2.clone() }; + info!("Running sync"); for (_, service, _) in network.full_nodes.iter().skip(1) { - service.network().add_reserved_peer(first_address.to_string()).expect("Error adding reserved peer"); + service.get().network().add_reserved_peer(first_address.to_string()).expect("Error adding reserved peer"); } for (_, service, _) in network.light_nodes.iter() { - service.network().add_reserved_peer(first_address.to_string()).expect("Error adding reserved peer"); + service.get().network().add_reserved_peer(first_address.to_string()).expect("Error adding reserved peer"); } network.run_until_all_full( |_index, service| - service.client().info().chain.best_number == NUM_BLOCKS.into(), + service.get().client().info().chain.best_number == (NUM_BLOCKS as u32).into(), |_index, service| - service.client().info().chain.best_number == NUM_BLOCKS.into(), + service.get().client().info().chain.best_number == (NUM_BLOCKS as u32).into(), ); + info!("Checking extrinsic propagation"); let first_service = network.full_nodes[0].1.clone(); - let best_block = BlockId::number(first_service.client().info().chain.best_number); - first_service.transaction_pool().submit_one(&best_block, extrinsic_factory(&first_service)).unwrap(); + let best_block = BlockId::number(first_service.get().client().info().chain.best_number); + let extrinsic = extrinsic_factory(&first_service); + first_service.get().transaction_pool().submit_one(&best_block, extrinsic).unwrap(); network.run_until_all_full( - |_index, service| service.transaction_pool().ready().count() == 1, + |_index, service| service.get().transaction_pool().ready().count() == 1, |_index, _service| true, ); } -pub fn consensus(spec: FactoryChainSpec, authorities: Vec) - where - F: ServiceFactory, +pub fn consensus(spec: FactoryChainSpec, authorities: Vec) where + F: ServiceFactory, + F::FullService: Future, + F::LightService: Future, { - const NUM_FULL_NODES: u32 = 10; - const NUM_LIGHT_NODES: u32 = 0; - const NUM_BLOCKS: u32 = 10; // 10 * 2 sec block production time = ~20 seconds + const NUM_FULL_NODES: usize = 10; + const NUM_LIGHT_NODES: usize = 0; + const NUM_BLOCKS: usize = 10; // 10 * 2 sec block production time = ~20 seconds let temp = TempDir::new("substrate-conensus-test").expect("Error creating test dir"); let mut network = TestNet::::new( &temp, @@ -377,35 +422,37 @@ pub fn consensus(spec: FactoryChainSpec, authorities: Vec) authorities, 30600, ); + info!("Checking consensus"); let first_address = network.authority_nodes[0].2.clone(); for (_, service, _) in network.full_nodes.iter() { - service.network().add_reserved_peer(first_address.to_string()).expect("Error adding reserved peer"); + service.get().network().add_reserved_peer(first_address.to_string()).expect("Error adding reserved peer"); } for (_, service, _) in network.light_nodes.iter() { - service.network().add_reserved_peer(first_address.to_string()).expect("Error adding reserved peer"); + service.get().network().add_reserved_peer(first_address.to_string()).expect("Error adding reserved peer"); } for (_, service, _) in network.authority_nodes.iter().skip(1) { - service.network().add_reserved_peer(first_address.to_string()).expect("Error adding reserved peer"); + service.get().network().add_reserved_peer(first_address.to_string()).expect("Error adding reserved peer"); } network.run_until_all_full( |_index, service| - service.client().info().chain.finalized_number >= (NUM_BLOCKS / 2).into(), + service.get().client().info().chain.finalized_number >= (NUM_BLOCKS as u32 / 2).into(), |_index, service| - service.client().info().chain.best_number >= (NUM_BLOCKS / 2).into(), + service.get().client().info().chain.best_number >= (NUM_BLOCKS as u32 / 2).into(), ); + info!("Adding more peers"); network.insert_nodes(&temp, NUM_FULL_NODES / 2, NUM_LIGHT_NODES / 2, vec![]); for (_, service, _) in network.full_nodes.iter() { - service.network().add_reserved_peer(first_address.to_string()).expect("Error adding reserved peer"); + service.get().network().add_reserved_peer(first_address.to_string()).expect("Error adding reserved peer"); } for (_, service, _) in network.light_nodes.iter() { - service.network().add_reserved_peer(first_address.to_string()).expect("Error adding reserved peer"); + service.get().network().add_reserved_peer(first_address.to_string()).expect("Error adding reserved peer"); } network.run_until_all_full( |_index, service| - service.client().info().chain.finalized_number >= NUM_BLOCKS.into(), + service.get().client().info().chain.finalized_number >= (NUM_BLOCKS as u32).into(), |_index, service| - service.client().info().chain.best_number >= NUM_BLOCKS.into(), + service.get().client().info().chain.best_number >= (NUM_BLOCKS as u32).into(), ); } diff --git a/substrate/node/cli/src/service.rs b/substrate/node/cli/src/service.rs index e13c5b901d..e27baca8e6 100644 --- a/substrate/node/cli/src/service.rs +++ b/substrate/node/cli/src/service.rs @@ -229,6 +229,7 @@ mod tests { use finality_tracker; use keyring::{ed25519::Keyring as AuthorityKeyring, sr25519::Keyring as AccountKeyring}; use substrate_service::ServiceFactory; + use service_test::SyncService; use crate::service::Factory; #[cfg(feature = "rhd")] @@ -264,8 +265,13 @@ mod tests { auxiliary: Vec::new(), } }; - let extrinsic_factory = |service: &::FullService| { - let payload = (0, Call::Balances(BalancesCall::transfer(RawAddress::Id(bob.public().0.into()), 69.into())), Era::immortal(), service.client().genesis_hash()); + let extrinsic_factory = |service: &SyncService<::FullService>| { + let payload = ( + 0, + Call::Balances(BalancesCall::transfer(RawAddress::Id(bob.public().0.into()), 69.into())), + Era::immortal(), + service.client().genesis_hash() + ); let signature = alice.sign(&payload.encode()).into(); let id = alice.public().0.into(); let xt = UncheckedExtrinsic { @@ -275,7 +281,11 @@ mod tests { let v: Vec = Decode::decode(&mut xt.as_slice()).unwrap(); OpaqueExtrinsic(v) }; - service_test::sync::(chain_spec::integration_test_config(), block_factory, extrinsic_factory); + service_test::sync::( + chain_spec::integration_test_config(), + block_factory, + extrinsic_factory, + ); } #[test] @@ -285,9 +295,14 @@ mod tests { let alice = Arc::new(AuthorityKeyring::Alice.pair()); let mut slot_num = 1u64; - let block_factory = |service: &::FullService| { - let mut inherent_data = service.config.custom.inherent_data_providers - .create_inherent_data().unwrap(); + let block_factory = |service: &SyncService<::FullService>| { + let service = service.get(); + let mut inherent_data = service + .config + .custom + .inherent_data_providers + .create_inherent_data() + .expect("Creates inherent data."); inherent_data.replace_data(finality_tracker::INHERENT_IDENTIFIER, &1u64); inherent_data.replace_data(timestamp::INHERENT_IDENTIFIER, &(slot_num * 10)); @@ -297,13 +312,14 @@ mod tests { client: service.client(), transaction_pool: service.transaction_pool(), }); + let mut digest = Digest::::default(); digest.push(>::aura_pre_digest(slot_num * 10 / 2)); let proposer = proposer_factory.init(&parent_header).unwrap(); let new_block = proposer.propose( inherent_data, digest, - ::std::time::Duration::from_secs(1), + std::time::Duration::from_secs(1), ).expect("Error making test block"); let (new_header, new_body) = new_block.deconstruct(); @@ -333,11 +349,11 @@ mod tests { let charlie = Arc::new(AccountKeyring::Charlie.pair()); let mut index = 0; - let extrinsic_factory = |service: &::FullService| { + let extrinsic_factory = |service: &SyncService<::FullService>| { let amount = 1000; let to = AddressPublic::from_raw(bob.public().0); let from = AddressPublic::from_raw(charlie.public().0); - let genesis_hash = service.client().block_hash(0).unwrap().unwrap(); + let genesis_hash = service.get().client().block_hash(0).unwrap().unwrap(); let signer = charlie.clone(); let function = Call::Balances(BalancesCall::transfer(to.into(), amount));