Service various cleanups (#3238)

* Remove generic from sign()

* Remove mandatory RuntimeGenesis trait req

* Remove requirement from Configuration

* Relax trait bounds of core/cli

* Move method

* The config field is no longer public

* Remove Components from bounds of functions

* Implement DerefMut for LightComponents

* Implement Executor for Full/LightComponents

* Fix bad merge

* Fix forgotten config()

* Fix build
This commit is contained in:
Pierre Krieger
2019-08-08 16:14:30 +02:00
committed by Gavin Wood
parent fe18b4055d
commit bafc7202ca
8 changed files with 133 additions and 88 deletions
+3 -1
View File
@@ -127,7 +127,7 @@ impl<G> Clone for ChainSpec<G> {
}
}
impl<G: RuntimeGenesis> ChainSpec<G> {
impl<G> ChainSpec<G> {
/// A list of bootnode addresses.
pub fn boot_nodes(&self) -> &[String] {
&self.spec.boot_nodes
@@ -215,7 +215,9 @@ impl<G: RuntimeGenesis> ChainSpec<G> {
genesis: GenesisSource::Factory(constructor),
}
}
}
impl<G: RuntimeGenesis> ChainSpec<G> {
/// Dump to json string.
pub fn to_json(self, raw: bool) -> Result<String, String> {
#[derive(Serialize, Deserialize)]
+26
View File
@@ -506,6 +506,16 @@ impl<Factory: ServiceFactory> Future for FullComponents<Factory> {
}
}
impl<Factory: ServiceFactory> Executor<Box<dyn Future<Item = (), Error = ()> + Send>>
for FullComponents<Factory> {
fn execute(
&self,
future: Box<dyn Future<Item = (), Error = ()> + Send>
) -> Result<(), futures::future::ExecuteError<Box<dyn Future<Item = (), Error = ()> + Send>>> {
self.service.execute(future)
}
}
impl<Factory: ServiceFactory> Components for FullComponents<Factory> {
type Factory = Factory;
type Executor = FullExecutor<Factory>;
@@ -606,6 +616,12 @@ impl<Factory: ServiceFactory> Deref for LightComponents<Factory> {
}
}
impl<Factory: ServiceFactory> DerefMut for LightComponents<Factory> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.service
}
}
impl<Factory: ServiceFactory> Future for LightComponents<Factory> {
type Item = ();
type Error = ();
@@ -615,6 +631,16 @@ impl<Factory: ServiceFactory> Future for LightComponents<Factory> {
}
}
impl<Factory: ServiceFactory> Executor<Box<dyn Future<Item = (), Error = ()> + Send>>
for LightComponents<Factory> {
fn execute(
&self,
future: Box<dyn Future<Item = (), Error = ()> + Send>
) -> Result<(), futures::future::ExecuteError<Box<dyn Future<Item = (), Error = ()> + Send>>> {
self.service.execute(future)
}
}
impl<Factory: ServiceFactory> Components for LightComponents<Factory> {
type Factory = Factory;
type Executor = LightExecutor<Factory>;
+52 -37
View File
@@ -93,7 +93,7 @@ pub struct Service<Components: components::Components> {
/// The elements must then be polled manually.
to_poll: Vec<Box<dyn Future<Item = (), Error = ()> + Send>>,
/// Configuration of this Service
pub config: FactoryFullConfiguration<Components::Factory>,
config: FactoryFullConfiguration<Components::Factory>,
rpc_handlers: rpc::RpcHandler,
_rpc: Box<dyn std::any::Any + Send + Sync>,
_telemetry: Option<tel::Telemetry>,
@@ -149,13 +149,6 @@ pub struct TelemetryOnConnect {
}
impl<Components: components::Components> Service<Components> {
/// Get event stream for telemetry connection established events.
pub fn telemetry_on_connect_stream(&self) -> TelemetryOnConnectNotifications {
let (sink, stream) = mpsc::unbounded();
self._telemetry_on_connect_sinks.lock().push(sink);
stream
}
/// Creates a new service.
pub fn new(
mut config: FactoryFullConfiguration<Components::Factory>,
@@ -200,7 +193,7 @@ impl<Components: components::Components> Service<Components> {
let transaction_pool = Arc::new(
Components::build_transaction_pool(config.transaction_pool.clone(), client.clone())?
);
let transaction_pool_adapter = Arc::new(TransactionPoolAdapter::<Components> {
let transaction_pool_adapter = Arc::new(TransactionPoolAdapter {
imports_external_transactions: !config.roles.is_light(),
pool: transaction_pool.clone(),
client: client.clone(),
@@ -383,9 +376,9 @@ impl<Components: components::Components> Service<Components> {
)
};
let rpc_handlers = gen_handler();
let rpc = start_rpc_servers::<Components::Factory, _>(&config, gen_handler)?;
let rpc = start_rpc_servers(&config, gen_handler)?;
let _ = to_spawn_tx.unbounded_send(Box::new(build_network_future::<Components, _, _>(
let _ = to_spawn_tx.unbounded_send(Box::new(build_network_future(
network_mut,
client.clone(),
network_status_sinks.clone(),
@@ -460,6 +453,27 @@ impl<Components: components::Components> Service<Components> {
})
}
/// Returns a reference to the config passed at initialization.
pub fn config(&self) -> &FactoryFullConfiguration<Components::Factory> {
&self.config
}
/// Returns a reference to the config passed at initialization.
///
/// > **Note**: This method is currently necessary because we extract some elements from the
/// > configuration at the end of the service initialization. It is intended to be
/// > removed.
pub fn config_mut(&mut self) -> &mut FactoryFullConfiguration<Components::Factory> {
&mut self.config
}
/// Get event stream for telemetry connection established events.
pub fn telemetry_on_connect_stream(&self) -> TelemetryOnConnectNotifications {
let (sink, stream) = mpsc::unbounded();
self._telemetry_on_connect_sinks.lock().push(sink);
stream
}
/// Return a shared instance of Telemetry (if enabled)
pub fn telemetry(&self) -> Option<tel::Telemetry> {
self._telemetry.as_ref().map(|t| t.clone())
@@ -577,14 +591,15 @@ impl<Components> Executor<Box<dyn Future<Item = (), Error = ()> + Send>>
///
/// The `status_sink` contain a list of senders to send a periodic network status to.
fn build_network_future<
Components: components::Components,
S: network::specialization::NetworkSpecialization<ComponentBlock<Components>>,
B: BlockT,
C: client::BlockchainEvents<B>,
S: network::specialization::NetworkSpecialization<B>,
H: network::ExHashT
> (
mut network: network::NetworkWorker<ComponentBlock<Components>, S, H>,
client: Arc<ComponentClient<Components>>,
status_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<(NetworkStatus<ComponentBlock<Components>>, NetworkState)>>>>,
rpc_rx: futures03::channel::mpsc::UnboundedReceiver<rpc::apis::system::Request<ComponentBlock<Components>>>,
mut network: network::NetworkWorker<B, S, H>,
client: Arc<C>,
status_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<(NetworkStatus<B>, NetworkState)>>>>,
rpc_rx: futures03::channel::mpsc::UnboundedReceiver<rpc::apis::system::Request<B>>,
should_have_peers: bool,
) -> impl Future<Item = (), Error = ()> {
// Compatibility shim while we're transitionning to stable Futures.
@@ -710,8 +725,8 @@ impl<Components> Drop for Service<Components> where Components: components::Comp
/// Starts RPC servers that run in their own thread, and returns an opaque object that keeps them alive.
#[cfg(not(target_os = "unknown"))]
fn start_rpc_servers<F: ServiceFactory, H: FnMut() -> rpc::RpcHandler>(
config: &FactoryFullConfiguration<F>,
fn start_rpc_servers<C, G, H: FnMut() -> rpc::RpcHandler>(
config: &Configuration<C, G>,
mut gen_handler: H
) -> Result<Box<dyn std::any::Any + Send + Sync>, error::Error> {
fn maybe_start_server<T, F>(address: Option<SocketAddr>, mut start: F) -> Result<Option<T>, io::Error>
@@ -751,8 +766,8 @@ fn start_rpc_servers<F: ServiceFactory, H: FnMut() -> rpc::RpcHandler>(
/// Starts RPC servers that run in their own thread, and returns an opaque object that keeps them alive.
#[cfg(target_os = "unknown")]
fn start_rpc_servers<F: ServiceFactory, H: FnMut() -> rpc::RpcHandler>(
_: &FactoryFullConfiguration<F>,
fn start_rpc_servers<C, G, H: FnMut() -> rpc::RpcHandler>(
_: &Configuration<C, G>,
_: H
) -> Result<Box<std::any::Any + Send + Sync>, error::Error> {
Ok(Box::new(()))
@@ -779,16 +794,10 @@ impl RpcSession {
}
/// Transaction pool adapter.
pub struct TransactionPoolAdapter<C: Components> {
pub struct TransactionPoolAdapter<C, P> {
imports_external_transactions: bool,
pool: Arc<TransactionPool<C::TransactionPoolApi>>,
client: Arc<ComponentClient<C>>,
}
impl<C: Components> TransactionPoolAdapter<C> {
fn best_block_id(&self) -> Option<BlockId<ComponentBlock<C>>> {
Some(BlockId::hash(self.client.info().chain.best_hash))
}
pool: Arc<P>,
client: Arc<C>,
}
/// Get transactions for propagation.
@@ -812,14 +821,20 @@ where
.collect()
}
impl<C: Components> network::TransactionPool<ComponentExHash<C>, ComponentBlock<C>> for
TransactionPoolAdapter<C> where <C as components::Components>::RuntimeApi: Send + Sync
impl<B, H, C, PoolApi, E> network::TransactionPool<H, B> for
TransactionPoolAdapter<C, TransactionPool<PoolApi>>
where
C: network::ClientHandle<B> + Send + Sync,
PoolApi: ChainApi<Block=B, Hash=H, Error=E>,
B: BlockT,
H: std::hash::Hash + Eq + sr_primitives::traits::Member + serde::Serialize,
E: txpool::error::IntoPoolError + From<txpool::error::Error>,
{
fn transactions(&self) -> Vec<(ComponentExHash<C>, ComponentExtrinsic<C>)> {
fn transactions(&self) -> Vec<(H, <B as BlockT>::Extrinsic)> {
transactions_to_propagate(&self.pool)
}
fn import(&self, transaction: &ComponentExtrinsic<C>) -> Option<ComponentExHash<C>> {
fn import(&self, transaction: &<B as BlockT>::Extrinsic) -> Option<H> {
if !self.imports_external_transactions {
debug!("Transaction rejected");
return None;
@@ -828,12 +843,12 @@ impl<C: Components> network::TransactionPool<ComponentExHash<C>, ComponentBlock<
let encoded = transaction.encode();
match Decode::decode(&mut &encoded[..]) {
Ok(uxt) => {
let best_block_id = self.best_block_id()?;
let best_block_id = BlockId::hash(self.client.info().chain.best_hash);
match self.pool.submit_one(&best_block_id, uxt) {
Ok(hash) => Some(hash),
Err(e) => match e.into_pool_error() {
Ok(txpool::error::Error::AlreadyImported(hash)) => {
hash.downcast::<ComponentExHash<C>>().ok()
hash.downcast::<H>().ok()
.map(|x| x.as_ref().clone())
},
Ok(e) => {
@@ -854,7 +869,7 @@ impl<C: Components> network::TransactionPool<ComponentExHash<C>, ComponentBlock<
}
}
fn on_broadcasted(&self, propagations: HashMap<ComponentExHash<C>, Vec<String>>) {
fn on_broadcasted(&self, propagations: HashMap<H, Vec<String>>) {
self.pool.on_broadcasted(propagations)
}
}