From a45bf143bb334cbefe1d58107181c78b81dd88b9 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 30 Nov 2021 19:28:46 +0100 Subject: [PATCH] add client example should be removed from this repo --- Cargo.toml | 2 +- client/Cargo.toml | 40 +++++++ client/src/lib.rs | 293 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 334 insertions(+), 1 deletion(-) create mode 100644 client/Cargo.toml create mode 100644 client/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 9b8a089454..6422d7da64 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = [".", "cli", "codegen", "macro", "test-runtime"] +members = [".", "cli", "codegen", "macro", "test-runtime", "client"] [package] name = "subxt" diff --git a/client/Cargo.toml b/client/Cargo.toml new file mode 100644 index 0000000000..a0574e8f2b --- /dev/null +++ b/client/Cargo.toml @@ -0,0 +1,40 @@ +[package] +name = "subxt-client" +version = "0.1.0" +authors = [ + "David Craven ", + "Parity Technologies ", +] +edition = "2018" + +license = "GPL-3.0" +repository = "https://github.com/paritytech/substrate-subxt" +documentation = "https://docs.rs/substrate-subxt-client" +homepage = "https://www.parity.io/" +description = "Embed a substrate node into your subxt application." +keywords = ["parity", "substrate", "blockchain"] + +[dependencies] +async-std = "1.8.0" +futures = "0.3.9" +jsonrpsee = { git = "https://github.com/paritytech/jsonrpsee/", branch = "extract-async-client", features = ["client"] } +log = "0.4.13" +thiserror = "1.0.23" + +sc-client-db = { git = "https://github.com/paritytech/substrate.git", branch = "master" } +sp-keyring = { git = "https://github.com/paritytech/substrate.git", branch = "master" } +sc-network = { git = "https://github.com/paritytech/substrate.git", branch = "master", default-features = false } +sc-service = { git = "https://github.com/paritytech/substrate.git", branch = "master", default-features = false } + +tokio = { version = "1.10", features = ["rt-multi-thread"] } + +[target.'cfg(target_arch="x86_64")'.dependencies] +sc-service = { git = "https://github.com/paritytech/substrate.git", branch = "master", default-features = false, features = [ + "wasmtime", +] } + +[dev-dependencies] +async-std = { version = "1.8.0", features = ["attributes"] } +env_logger = "0.8.2" +node-cli = { git = "https://github.com/paritytech/substrate.git", branch = "master", default-features = false } +tempdir = "0.3.7" diff --git a/client/src/lib.rs b/client/src/lib.rs new file mode 100644 index 0000000000..bd1f5d6055 --- /dev/null +++ b/client/src/lib.rs @@ -0,0 +1,293 @@ +// Copyright 2019-2021 Parity Technologies (UK) Ltd. +// This file is part of substrate-subxt. +// +// subxt is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// subxt is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with substrate-subxt. If not, see . + +//! Client for embedding substrate nodes. + +#![deny(missing_docs)] + +#[cfg(test)] +mod tests; + +use async_std::task; +use futures::{ + channel::mpsc, + future::{ + select, + FutureExt, + }, + sink::SinkExt, + stream::StreamExt, +}; +use jsonrpsee::{ + core_client::Client as JsonRpcClient, + types::{ + async_trait, + traits::{ + TransportReceiver, + TransportSender, + }, + }, +}; +use sc_network::config::TransportConfig; +pub use sc_service::{ + config::{ + DatabaseSource, + KeystoreConfig, + WasmExecutionMethod, + }, + Error as ServiceError, +}; +use sc_service::{ + config::{ + NetworkConfiguration, + TelemetryEndpoints, + }, + ChainSpec, + Configuration, + KeepBlocks, + RpcHandlers, + RpcSession, + TaskManager, +}; +use thiserror::Error; + +/// Error thrown by the client. +#[derive(Debug, Error)] +pub enum SubxtClientError { + /// Failed to parse json rpc message. + #[error("{0}")] + Json(#[from] serde_json::Error), + /// Channel closed. + #[error("{0}")] + Mpsc(#[from] mpsc::SendError), +} + +/// Sending end. +pub struct Sender(mpsc::UnboundedSender); + +/// Receiving end +pub struct Receiver(mpsc::UnboundedReceiver); + +#[async_trait] +impl TransportSender for Sender { + type Error = SubxtClientError; + + async fn send(&mut self, msg: String) -> Result<(), Self::Error> { + self.0.send(msg).await?; + Ok(()) + } +} + +#[async_trait] +impl TransportReceiver for Receiver { + type Error = SubxtClientError; + + async fn receive(&mut self) -> Result { + let msg = self.0.next().await.expect("channel should be open"); + Ok(msg) + } +} + +/// Client for an embedded substrate node. +pub struct SubxtClient { + sender: Sender, + receiver: Receiver, +} + +impl SubxtClient { + /// Create a new client. + pub fn new(mut task_manager: TaskManager, rpc: RpcHandlers) -> Self { + let (to_back, from_front) = mpsc::unbounded(); + let (to_front, from_back) = mpsc::unbounded(); + + let session = RpcSession::new(to_front.clone()); + task::spawn( + select( + Box::pin(from_front.for_each(move |message: String| { + let rpc = rpc.clone(); + let session = session.clone(); + let mut to_front = to_front.clone(); + async move { + let response = rpc.rpc_query(&session, &message).await; + if let Some(response) = response { + to_front.send(response).await.ok(); + } + } + })), + Box::pin(async move { + task_manager.future().await.ok(); + }), + ) + .map(drop), + ); + + Self { + sender: Sender(to_back), + receiver: Receiver(from_back), + } + } + + /// Creates a new client from a config. + pub fn from_config( + config: SubxtClientConfig, + builder: impl Fn(Configuration) -> Result<(TaskManager, RpcHandlers), ServiceError>, + ) -> Result { + let config = config.into_service_config(); + let (task_manager, rpc_handlers) = (builder)(config)?; + Ok(Self::new(task_manager, rpc_handlers)) + } +} + +impl From for JsonRpcClient { + fn from(client: SubxtClient) -> Self { + (client.sender, client.receiver).into() + } +} + +/// Role of the node. +#[derive(Clone, Copy, Debug)] +pub enum Role { + /// Light client. + Light, + /// A full node (mainly used for testing purposes). + Authority(sp_keyring::AccountKeyring), +} + +impl From for sc_service::Role { + fn from(role: Role) -> Self { + match role { + Role::Light => Self::Light, + Role::Authority(_) => Self::Authority, + } + } +} + +impl From for Option { + fn from(role: Role) -> Self { + match role { + Role::Light => None, + Role::Authority(key) => Some(key.to_seed()), + } + } +} + +/// Client configuration. +#[derive(Clone)] +pub struct SubxtClientConfig { + /// Name of the implementation. + pub impl_name: &'static str, + /// Version of the implementation. + pub impl_version: &'static str, + /// Author of the implementation. + pub author: &'static str, + /// Copyright start year. + pub copyright_start_year: i32, + /// Database configuration. + pub db: DatabaseSource, + /// Keystore configuration. + pub keystore: KeystoreConfig, + /// Chain specification. + pub chain_spec: C, + /// Role of the node. + pub role: Role, + /// Enable telemetry on the given port. + pub telemetry: Option, + /// Wasm execution method + pub wasm_method: WasmExecutionMethod, + /// Handle to the tokio runtime. Will be used to spawn futures by the task manager. + pub tokio_handle: tokio::runtime::Handle, +} + +impl SubxtClientConfig { + /// Creates a service configuration. + pub fn into_service_config(self) -> Configuration { + let mut network = NetworkConfiguration::new( + format!("{} (subxt client)", self.chain_spec.name()), + "unknown", + Default::default(), + None, + ); + network.boot_nodes = self.chain_spec.boot_nodes().to_vec(); + network.transport = TransportConfig::Normal { + enable_mdns: true, + allow_private_ipv4: true, + // wasm_external_transport: None, + }; + let telemetry_endpoints = if let Some(port) = self.telemetry { + let endpoints = TelemetryEndpoints::new(vec![( + format!("/ip4/127.0.0.1/tcp/{}/ws", port), + 0, + )]) + .expect("valid config; qed"); + Some(endpoints) + } else { + None + }; + let service_config = Configuration { + network, + impl_name: self.impl_name.to_string(), + impl_version: self.impl_version.to_string(), + chain_spec: Box::new(self.chain_spec), + role: self.role.into(), + database: self.db, + keystore: self.keystore, + max_runtime_instances: 8, + announce_block: true, + dev_key_seed: self.role.into(), + telemetry_endpoints, + tokio_handle: self.tokio_handle, + default_heap_pages: Default::default(), + disable_grandpa: Default::default(), + execution_strategies: Default::default(), + force_authoring: Default::default(), + keep_blocks: KeepBlocks::All, + keystore_remote: Default::default(), + offchain_worker: Default::default(), + prometheus_config: Default::default(), + rpc_cors: Default::default(), + rpc_http: Default::default(), + rpc_ipc: Default::default(), + rpc_ws: Default::default(), + rpc_ws_max_connections: Default::default(), + rpc_methods: Default::default(), + state_cache_child_ratio: Default::default(), + state_cache_size: Default::default(), + tracing_receiver: Default::default(), + tracing_targets: Default::default(), + transaction_pool: Default::default(), + wasm_method: self.wasm_method, + base_path: Default::default(), + informant_output_format: Default::default(), + state_pruning: Default::default(), + transaction_storage: sc_client_db::TransactionStorageMode::BlockBody, + wasm_runtime_overrides: Default::default(), + rpc_max_payload: Default::default(), + ws_max_out_buffer_capacity: Default::default(), + }; + + log::info!("{}", service_config.impl_name); + log::info!("✌️ version {}", service_config.impl_version); + log::info!("❤️ by {}, {}", self.author, self.copyright_start_year); + log::info!( + "📋 Chain specification: {}", + service_config.chain_spec.name() + ); + log::info!("🏷 Node name: {}", service_config.network.node_name); + log::info!("👤 Role: {:?}", self.role); + + service_config + } +}