Initial: Offchain Workers (#1942)

* Refactor state-machine stuff.

* Fix tests.

* WiP

* WiP2

* Service support for offchain workers.

* Service support for offchain workers.

* Testing offchain worker.

* Initial version working.

* Pass side effects in call.

* Pass OffchainExt in context.

* Submit extrinsics to the pool.

* Support inherents.

* Insert to inherents pool.

* Inserting to the pool asynchronously.

* Add test to offchain worker.

* Implement convenience syntax for modules.

* Dispatching offchain worker through executive.

* Fix offchain test.

* Remove offchain worker from timestamp.

* Update Cargo.lock.

* Address review comments.

* Use latest patch version for futures.

* Add CLI parameter for offchain worker.

* Fix compilation.

* Fix test.

* Fix extrinsics format for tests.

* Fix RPC test.

* Bump spec version.

* Fix executive.

* Fix support macro.

* Address grumbles.

* Bump runtime
This commit is contained in:
Tomasz Drwięga
2019-03-25 23:22:11 +01:00
committed by Gav Wood
parent 3ee0e69463
commit e2f5e40876
58 changed files with 1158 additions and 178 deletions
+8
View File
@@ -116,35 +116,43 @@ impl<G: RuntimeGenesis> Clone for ChainSpec<G> {
}
impl<G: RuntimeGenesis> ChainSpec<G> {
/// A list of bootnode addresses.
pub fn boot_nodes(&self) -> &[String] {
&self.spec.boot_nodes
}
/// Spec name.
pub fn name(&self) -> &str {
&self.spec.name
}
/// Spec id.
pub fn id(&self) -> &str {
&self.spec.id
}
/// Telemetry endpoints (if any)
pub fn telemetry_endpoints(&self) -> &Option<TelemetryEndpoints> {
&self.spec.telemetry_endpoints
}
/// Network protocol id.
pub fn protocol_id(&self) -> Option<&str> {
self.spec.protocol_id.as_ref().map(String::as_str)
}
/// Name of the consensus engine.
pub fn consensus_engine(&self) -> Option<&str> {
self.spec.consensus_engine.as_ref().map(String::as_str)
}
/// Additional loosly-typed properties of the chain.
pub fn properties(&self) -> Properties {
// Return an empty JSON object if 'properties' not defined in config
self.spec.properties.as_ref().unwrap_or(&json::map::Map::new()).clone()
}
/// Add a bootnode to the list.
pub fn add_boot_node(&mut self, addr: Multiaddr) {
self.spec.boot_nodes.push(addr.to_string())
}
+44 -19
View File
@@ -21,7 +21,7 @@ use serde::{Serialize, de::DeserializeOwned};
use tokio::runtime::TaskExecutor;
use crate::chain_spec::ChainSpec;
use client_db;
use client::{self, Client, runtime_api::{Metadata, TaggedTransactionQueue}};
use client::{self, Client, runtime_api};
use crate::{error, Service, maybe_start_server};
use consensus_common::import_queue::ImportQueue;
use network::{self, OnDemand};
@@ -150,7 +150,7 @@ pub trait StartRPC<C: Components> {
impl<C: Components> StartRPC<Self> for C where
ComponentClient<C>: ProvideRuntimeApi,
<ComponentClient<C> as ProvideRuntimeApi>::Api: Metadata<ComponentBlock<C>>,
<ComponentClient<C> as ProvideRuntimeApi>::Api: runtime_api::Metadata<ComponentBlock<C>>,
{
type ServersHandle = (Option<rpc::HttpServer>, Option<Mutex<rpc::WsServer>>);
@@ -192,14 +192,14 @@ impl<C: Components> StartRPC<Self> for C where
/// Something that can maintain transaction pool on every imported block.
pub trait MaintainTransactionPool<C: Components> {
fn on_block_imported(
fn maintain_transaction_pool(
id: &BlockId<ComponentBlock<C>>,
client: &ComponentClient<C>,
transaction_pool: &TransactionPool<C::TransactionPoolApi>,
) -> error::Result<()>;
}
fn on_block_imported<Api, Backend, Block, Executor, PoolApi>(
fn maintain_transaction_pool<Api, Backend, Block, Executor, PoolApi>(
id: &BlockId<Block>,
client: &Client<Backend, Executor, Block, Api>,
transaction_pool: &TransactionPool<PoolApi>,
@@ -207,7 +207,7 @@ fn on_block_imported<Api, Backend, Block, Executor, PoolApi>(
Block: BlockT<Hash = <Blake2Hasher as ::primitives::Hasher>::Out>,
Backend: client::backend::Backend<Block, Blake2Hasher>,
Client<Backend, Executor, Block, Api>: ProvideRuntimeApi,
<Client<Backend, Executor, Block, Api> as ProvideRuntimeApi>::Api: TaggedTransactionQueue<Block>,
<Client<Backend, Executor, Block, Api> as ProvideRuntimeApi>::Api: runtime_api::TaggedTransactionQueue<Block>,
Executor: client::CallExecutor<Block, Blake2Hasher>,
PoolApi: txpool::ChainApi<Hash = Block::Hash, Block = Block>,
{
@@ -227,14 +227,35 @@ fn on_block_imported<Api, Backend, Block, Executor, PoolApi>(
impl<C: Components> MaintainTransactionPool<Self> for C where
ComponentClient<C>: ProvideRuntimeApi,
<ComponentClient<C> as ProvideRuntimeApi>::Api: TaggedTransactionQueue<ComponentBlock<C>>,
<ComponentClient<C> as ProvideRuntimeApi>::Api: runtime_api::TaggedTransactionQueue<ComponentBlock<C>>,
{
fn on_block_imported(
fn maintain_transaction_pool(
id: &BlockId<ComponentBlock<C>>,
client: &ComponentClient<C>,
transaction_pool: &TransactionPool<C::TransactionPoolApi>,
) -> error::Result<()> {
on_block_imported(id, client, transaction_pool)
maintain_transaction_pool(id, client, transaction_pool)
}
}
pub trait OffchainWorker<C: Components> {
fn offchain_workers(
number: &FactoryBlockNumber<C::Factory>,
offchain: &offchain::OffchainWorkers<ComponentClient<C>, ComponentBlock<C>>,
pool: &Arc<TransactionPool<C::TransactionPoolApi>>,
) -> error::Result<()>;
}
impl<C: Components> OffchainWorker<Self> for C where
ComponentClient<C>: ProvideRuntimeApi,
<ComponentClient<C> as ProvideRuntimeApi>::Api: offchain::OffchainWorkerApi<ComponentBlock<C>>,
{
fn offchain_workers(
number: &FactoryBlockNumber<C::Factory>,
offchain: &offchain::OffchainWorkers<ComponentClient<C>, ComponentBlock<C>>,
pool: &Arc<TransactionPool<C::TransactionPoolApi>>,
) -> error::Result<()> {
Ok(offchain.on_block_imported(number, pool))
}
}
@@ -246,9 +267,16 @@ pub trait ServiceTrait<C: Components>:
+ 'static
+ StartRPC<C>
+ MaintainTransactionPool<C>
+ OffchainWorker<C>
{}
impl<C: Components, T> ServiceTrait<C> for T where
T: Deref<Target = Service<C>> + Send + Sync + 'static + StartRPC<C> + MaintainTransactionPool<C>
T: Deref<Target = Service<C>>
+ Send
+ Sync
+ 'static
+ StartRPC<C>
+ MaintainTransactionPool<C>
+ OffchainWorker<C>
{}
/// A collection of types and methods to build a service on top of the substrate service.
@@ -338,17 +366,14 @@ pub trait Components: Sized + 'static {
type Executor: 'static + client::CallExecutor<FactoryBlock<Self::Factory>, Blake2Hasher> + Send + Sync + Clone;
/// The type that implements the runtime API.
type RuntimeApi: Send + Sync;
/// A type that can start the RPC.
type RPC: StartRPC<Self>;
/// A type that can start all runtime-dependent services.
type RuntimeServices: ServiceTrait<Self>;
// TODO: Traitify transaction pool and allow people to implement their own. (#1242)
/// A type that can maintain transaction pool.
type TransactionPool: MaintainTransactionPool<Self>;
/// Extrinsic pool type.
type TransactionPoolApi: 'static + txpool::ChainApi<
Hash = <FactoryBlock<Self::Factory> as BlockT>::Hash,
Block = FactoryBlock<Self::Factory>
>;
/// Our Import Queue
type ImportQueue: ImportQueue<FactoryBlock<Self::Factory>> + 'static;
@@ -382,6 +407,7 @@ pub struct FullComponents<Factory: ServiceFactory> {
}
impl<Factory: ServiceFactory> FullComponents<Factory> {
/// Create new `FullComponents`
pub fn new(
config: FactoryFullConfiguration<Factory>,
task_executor: TaskExecutor
@@ -416,8 +442,7 @@ impl<Factory: ServiceFactory> Components for FullComponents<Factory> {
type TransactionPoolApi = <Factory as ServiceFactory>::FullTransactionPoolApi;
type ImportQueue = Factory::FullImportQueue;
type RuntimeApi = Factory::RuntimeApi;
type RPC = Factory::FullService;
type TransactionPool = Factory::FullService;
type RuntimeServices = Factory::FullService;
fn build_client(
config: &FactoryFullConfiguration<Factory>,
@@ -462,6 +487,7 @@ pub struct LightComponents<Factory: ServiceFactory> {
}
impl<Factory: ServiceFactory> LightComponents<Factory> {
/// Create new `LightComponents`
pub fn new(
config: FactoryFullConfiguration<Factory>,
task_executor: TaskExecutor
@@ -490,8 +516,7 @@ impl<Factory: ServiceFactory> Components for LightComponents<Factory> {
type TransactionPoolApi = <Factory as ServiceFactory>::LightTransactionPoolApi;
type ImportQueue = <Factory as ServiceFactory>::LightImportQueue;
type RuntimeApi = Factory::RuntimeApi;
type RPC = Factory::LightService;
type TransactionPool = Factory::LightService;
type RuntimeServices = Factory::LightService;
fn build_client(
config: &FactoryFullConfiguration<Factory>,
@@ -564,7 +589,7 @@ mod tests {
// fire notification - this should clean up the queue
assert_eq!(pool.status().ready, 1);
on_block_imported(
maintain_transaction_pool(
&id,
&client,
&pool,
+3
View File
@@ -68,6 +68,8 @@ pub struct Configuration<C, G: Serialize + DeserializeOwned + BuildStorage> {
pub telemetry_endpoints: Option<TelemetryEndpoints>,
/// The default number of 64KB pages to allocate for Wasm execution
pub default_heap_pages: Option<u64>,
/// Should offchain workers be executed.
pub offchain_worker: bool,
/// Enable authoring even when offline.
pub force_authoring: bool,
/// Disable GRANDPA when running in validator mode
@@ -97,6 +99,7 @@ impl<C: Default, G: Serialize + DeserializeOwned + BuildStorage> Configuration<C
rpc_ws: None,
telemetry_endpoints: None,
default_heap_pages: None,
offchain_worker: Default::default(),
force_authoring: false,
disable_grandpa: false,
};
+51 -17
View File
@@ -17,7 +17,7 @@
//! Substrate service. Starts a thread that spins up the network, client, and extrinsic pool.
//! Manages communication between them.
#![warn(unused_extern_crates)]
#![warn(missing_docs)]
mod components;
mod error;
@@ -28,20 +28,18 @@ pub mod chain_ops;
use std::io;
use std::net::SocketAddr;
use std::collections::HashMap;
#[doc(hidden)]
pub use std::{ops::Deref, result::Result, sync::Arc};
use log::{info, warn, debug};
use futures::prelude::*;
use keystore::Store as Keystore;
use client::BlockchainEvents;
use exit_future::Signal;
use futures::prelude::*;
use inherents::pool::InherentsPool;
use keystore::Store as Keystore;
use log::{info, warn, debug};
use parity_codec::{Encode, Decode};
use primitives::Pair;
use runtime_primitives::generic::BlockId;
use runtime_primitives::traits::{Header, As};
use exit_future::Signal;
#[doc(hidden)]
pub use tokio::runtime::TaskExecutor;
use substrate_executor::NativeExecutor;
use parity_codec::{Encode, Decode};
use tel::{telemetry, SUBSTRATE_INFO};
pub use self::error::{ErrorKind, Error};
@@ -59,9 +57,13 @@ pub use components::{ServiceFactory, FullBackend, FullExecutor, LightBackend,
FactoryFullConfiguration, RuntimeGenesis, FactoryGenesis,
ComponentExHash, ComponentExtrinsic, FactoryExtrinsic
};
use components::{StartRPC, MaintainTransactionPool};
use components::{StartRPC, MaintainTransactionPool, OffchainWorker};
#[doc(hidden)]
pub use std::{ops::Deref, result::Result, sync::Arc};
#[doc(hidden)]
pub use network::OnDemand;
#[doc(hidden)]
pub use tokio::runtime::TaskExecutor;
const DEFAULT_PROTOCOL_ID: &str = "sup";
@@ -70,6 +72,7 @@ pub struct Service<Components: components::Components> {
client: Arc<ComponentClient<Components>>,
network: Option<Arc<components::NetworkService<Components::Factory>>>,
transaction_pool: Arc<TransactionPool<Components::TransactionPoolApi>>,
inherents_pool: Arc<InherentsPool<ComponentExtrinsic<Components>>>,
keystore: Keystore,
exit: ::exit_future::Exit,
signal: Option<Signal>,
@@ -77,6 +80,7 @@ pub struct Service<Components: components::Components> {
pub config: FactoryFullConfiguration<Components::Factory>,
_rpc: Box<::std::any::Any + Send + Sync>,
_telemetry: Option<Arc<tel::Telemetry>>,
_offchain_workers: Option<Arc<offchain::OffchainWorkers<ComponentClient<Components>, ComponentBlock<Components>>>>,
}
/// Creates bare client without any networking.
@@ -96,9 +100,7 @@ impl<Components: components::Components> Service<Components> {
pub fn new(
mut config: FactoryFullConfiguration<Components::Factory>,
task_executor: TaskExecutor,
)
-> Result<Self, error::Error>
{
) -> Result<Self, error::Error> {
let (signal, exit) = ::exit_future::signal();
// Create client
@@ -169,24 +171,48 @@ impl<Components: components::Components> Service<Components> {
)?;
on_demand.map(|on_demand| on_demand.set_network_sender(network_chan));
let inherents_pool = Arc::new(InherentsPool::default());
let offchain_workers = if config.offchain_worker {
Some(Arc::new(offchain::OffchainWorkers::new(
client.clone(),
inherents_pool.clone(),
task_executor.clone(),
)))
} else {
None
};
{
// block notifications
let network = Arc::downgrade(&network);
let txpool = Arc::downgrade(&transaction_pool);
let wclient = Arc::downgrade(&client);
let offchain = offchain_workers.as_ref().map(Arc::downgrade);
let events = client.import_notification_stream()
.for_each(move |notification| {
let number = *notification.header.number();
if let Some(network) = network.upgrade() {
network.on_block_imported(notification.hash, notification.header);
}
if let (Some(txpool), Some(client)) = (txpool.upgrade(), wclient.upgrade()) {
Components::TransactionPool::on_block_imported(
Components::RuntimeServices::maintain_transaction_pool(
&BlockId::hash(notification.hash),
&*client,
&*txpool,
).map_err(|e| warn!("Pool error processing new block: {:?}", e))?;
}
if let (Some(txpool), Some(offchain)) = (txpool.upgrade(), offchain.as_ref().and_then(|o| o.upgrade())) {
Components::RuntimeServices::offchain_workers(
&number,
&offchain,
&txpool,
).map_err(|e| warn!("Offchain workers error processing new block: {:?}", e))?;
}
Ok(())
})
.select(exit.clone())
@@ -264,7 +290,7 @@ impl<Components: components::Components> Service<Components> {
impl_version: config.impl_version.into(),
properties: config.chain_spec.properties(),
};
let rpc = Components::RPC::start_rpc(
let rpc = Components::RuntimeServices::start_rpc(
client.clone(), network.clone(), has_bootnodes, system_info, config.rpc_http,
config.rpc_ws, task_executor.clone(), transaction_pool.clone(),
)?;
@@ -299,12 +325,14 @@ impl<Components: components::Components> Service<Components> {
client,
network: Some(network),
transaction_pool,
inherents_pool,
signal: Some(signal),
keystore,
config,
exit,
_rpc: Box::new(rpc),
_telemetry: telemetry,
_offchain_workers: offchain_workers,
})
}
@@ -321,6 +349,7 @@ impl<Components: components::Components> Service<Components> {
}
}
/// return a shared instance of Telemtry (if enabled)
pub fn telemetry(&self) -> Option<Arc<tel::Telemetry>> {
self._telemetry.as_ref().map(|t| t.clone())
}
@@ -337,11 +366,16 @@ impl<Components> Service<Components> where Components: components::Components {
self.network.as_ref().expect("self.network always Some").clone()
}
/// Get shared extrinsic pool instance.
/// Get shared transaction pool instance.
pub fn transaction_pool(&self) -> Arc<TransactionPool<Components::TransactionPoolApi>> {
self.transaction_pool.clone()
}
/// Get shared inherents pool instance.
pub fn inherents_pool(&self) -> Arc<InherentsPool<ComponentExtrinsic<Components>>> {
self.inherents_pool.clone()
}
/// Get shared keystore.
pub fn keystore(&self) -> &Keystore {
&self.keystore