mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-30 03:47:57 +00:00
Rc5 (#143)
* Update to rc6. * Update runtime. * Update node to rc6. * Update client. * Fix node. * Add option to enable telemetry.
This commit is contained in:
+191
-180
@@ -20,20 +20,20 @@
|
||||
|
||||
use async_std::task;
|
||||
use futures::{
|
||||
channel::mpsc,
|
||||
compat::{
|
||||
Compat01As03,
|
||||
Compat01As03Sink,
|
||||
Sink01CompatExt,
|
||||
Stream01CompatExt,
|
||||
},
|
||||
future::poll_fn,
|
||||
sink::SinkExt,
|
||||
stream::{
|
||||
Stream,
|
||||
StreamExt,
|
||||
future::{
|
||||
select,
|
||||
FutureExt,
|
||||
},
|
||||
sink::SinkExt,
|
||||
stream::StreamExt,
|
||||
};
|
||||
use futures01::sync::mpsc;
|
||||
use futures01::sync::mpsc as mpsc01;
|
||||
use jsonrpsee::{
|
||||
common::{
|
||||
Request,
|
||||
@@ -53,16 +53,18 @@ use sc_service::{
|
||||
config::{
|
||||
NetworkConfiguration,
|
||||
TaskType,
|
||||
TelemetryEndpoints,
|
||||
},
|
||||
AbstractService,
|
||||
ChainSpec,
|
||||
Configuration,
|
||||
RpcHandlers,
|
||||
RpcSession,
|
||||
TaskManager,
|
||||
};
|
||||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::Poll,
|
||||
sync::Arc,
|
||||
};
|
||||
use thiserror::Error;
|
||||
|
||||
@@ -74,81 +76,59 @@ pub enum SubxtClientError {
|
||||
Json(#[from] serde_json::Error),
|
||||
/// Channel closed.
|
||||
#[error("{0}")]
|
||||
Mpsc(#[from] mpsc::SendError<String>),
|
||||
}
|
||||
|
||||
/// Role of the node.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub enum Role {
|
||||
/// Light client.
|
||||
Light,
|
||||
/// A full node (maninly used for testing purposes).
|
||||
Authority(sp_keyring::AccountKeyring),
|
||||
}
|
||||
|
||||
impl From<Role> for sc_service::Role {
|
||||
fn from(role: Role) -> Self {
|
||||
match role {
|
||||
Role::Light => Self::Light,
|
||||
Role::Authority(_) => {
|
||||
Self::Authority {
|
||||
sentry_nodes: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Role> for Option<String> {
|
||||
fn from(role: Role) -> Self {
|
||||
match role {
|
||||
Role::Light => None,
|
||||
Role::Authority(key) => Some(key.to_seed()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Client configuration.
|
||||
#[derive(Clone)]
|
||||
pub struct SubxtClientConfig<C: ChainSpec + 'static, S: AbstractService> {
|
||||
/// Name of the implementation.
|
||||
pub impl_name: &'static str,
|
||||
/// Version of the implementation.
|
||||
pub impl_version: &'static str,
|
||||
/// Author of the implementation.
|
||||
pub author: &'static str,
|
||||
/// Copyright start year.
|
||||
pub copyright_start_year: i32,
|
||||
/// Database configuration.
|
||||
pub db: DatabaseConfig,
|
||||
/// Keystore configuration.
|
||||
pub keystore: KeystoreConfig,
|
||||
/// Service builder.
|
||||
pub builder: fn(Configuration) -> Result<S, sc_service::Error>,
|
||||
/// Chain specification.
|
||||
pub chain_spec: C,
|
||||
/// Role of the node.
|
||||
pub role: Role,
|
||||
Mpsc(#[from] mpsc::SendError),
|
||||
}
|
||||
|
||||
/// Client for an embedded substrate node.
|
||||
pub struct SubxtClient {
|
||||
to_back: Compat01As03Sink<mpsc::Sender<String>, String>,
|
||||
from_back: Compat01As03<mpsc::Receiver<String>>,
|
||||
to_back: mpsc::Sender<String>,
|
||||
from_back: Compat01As03<mpsc01::Receiver<String>>,
|
||||
}
|
||||
|
||||
impl SubxtClient {
|
||||
/// Create a new client from a config.
|
||||
pub fn new<C: ChainSpec + 'static, S: AbstractService>(
|
||||
config: SubxtClientConfig<C, S>,
|
||||
) -> Result<Self, ServiceError> {
|
||||
/// Create a new client.
|
||||
pub fn new(mut task_manager: TaskManager, rpc: Arc<RpcHandlers>) -> Self {
|
||||
let (to_back, from_front) = mpsc::channel(4);
|
||||
let (to_front, from_back) = mpsc::channel(4);
|
||||
start_subxt_client(config, from_front, to_front)?;
|
||||
Ok(Self {
|
||||
to_back: to_back.sink_compat(),
|
||||
let (to_front, from_back) = mpsc01::channel(4);
|
||||
|
||||
let session = RpcSession::new(to_front.clone());
|
||||
let session2 = session.clone();
|
||||
task::spawn(
|
||||
select(
|
||||
Box::pin(from_front.for_each(move |message: String| {
|
||||
let rpc = rpc.clone();
|
||||
let session = session2.clone();
|
||||
let mut to_front = to_front.clone().sink_compat();
|
||||
async move {
|
||||
let response = rpc.rpc_query(&session, &message).await;
|
||||
if let Some(response) = response {
|
||||
to_front.send(response).await.ok();
|
||||
}
|
||||
}
|
||||
})),
|
||||
Box::pin(async move {
|
||||
task_manager.future().await.ok();
|
||||
}),
|
||||
)
|
||||
.map(drop),
|
||||
);
|
||||
|
||||
Self {
|
||||
to_back,
|
||||
from_back: from_back.compat(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new client from a config.
|
||||
pub fn from_config<C: ChainSpec + 'static>(
|
||||
config: SubxtClientConfig<C>,
|
||||
builder: impl Fn(
|
||||
Configuration,
|
||||
) -> Result<(TaskManager, Arc<RpcHandlers>), ServiceError>,
|
||||
) -> Result<Self, ServiceError> {
|
||||
let config = config.to_service_config();
|
||||
let (task_manager, rpc_handlers) = (builder)(config)?;
|
||||
Ok(Self::new(task_manager, rpc_handlers))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -188,113 +168,140 @@ impl From<SubxtClient> for jsonrpsee::Client {
|
||||
}
|
||||
}
|
||||
|
||||
fn start_subxt_client<C: ChainSpec + 'static, S: AbstractService>(
|
||||
config: SubxtClientConfig<C, S>,
|
||||
from_front: mpsc::Receiver<String>,
|
||||
to_front: mpsc::Sender<String>,
|
||||
) -> Result<(), ServiceError> {
|
||||
let mut network = NetworkConfiguration::new(
|
||||
format!("{} (subxt client)", config.chain_spec.name()),
|
||||
"unknown",
|
||||
Default::default(),
|
||||
None,
|
||||
);
|
||||
network.boot_nodes = config.chain_spec.boot_nodes().to_vec();
|
||||
network.transport = TransportConfig::Normal {
|
||||
enable_mdns: true,
|
||||
allow_private_ipv4: true,
|
||||
wasm_external_transport: None,
|
||||
use_yamux_flow_control: true,
|
||||
};
|
||||
let service_config = Configuration {
|
||||
network,
|
||||
impl_name: config.impl_name,
|
||||
impl_version: config.impl_version,
|
||||
chain_spec: Box::new(config.chain_spec),
|
||||
role: config.role.into(),
|
||||
task_executor: (move |fut, ty| {
|
||||
match ty {
|
||||
TaskType::Async => task::spawn(fut),
|
||||
TaskType::Blocking => task::spawn_blocking(|| task::block_on(fut)),
|
||||
};
|
||||
})
|
||||
.into(),
|
||||
database: config.db,
|
||||
keystore: config.keystore,
|
||||
max_runtime_instances: 8,
|
||||
announce_block: true,
|
||||
dev_key_seed: config.role.into(),
|
||||
/// Role of the node.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub enum Role {
|
||||
/// Light client.
|
||||
Light,
|
||||
/// A full node (maninly used for testing purposes).
|
||||
Authority(sp_keyring::AccountKeyring),
|
||||
}
|
||||
|
||||
telemetry_endpoints: Default::default(),
|
||||
telemetry_external_transport: Default::default(),
|
||||
default_heap_pages: Default::default(),
|
||||
disable_grandpa: Default::default(),
|
||||
execution_strategies: Default::default(),
|
||||
force_authoring: Default::default(),
|
||||
offchain_worker: Default::default(),
|
||||
prometheus_config: Default::default(),
|
||||
pruning: Default::default(),
|
||||
rpc_cors: Default::default(),
|
||||
rpc_http: Default::default(),
|
||||
rpc_ipc: Default::default(),
|
||||
rpc_ws: Default::default(),
|
||||
rpc_ws_max_connections: Default::default(),
|
||||
rpc_methods: Default::default(),
|
||||
state_cache_child_ratio: Default::default(),
|
||||
state_cache_size: Default::default(),
|
||||
tracing_receiver: Default::default(),
|
||||
tracing_targets: Default::default(),
|
||||
transaction_pool: Default::default(),
|
||||
wasm_method: Default::default(),
|
||||
base_path: Default::default(),
|
||||
informant_output_format: Default::default(),
|
||||
};
|
||||
|
||||
log::info!("{}", service_config.impl_name);
|
||||
log::info!("✌️ version {}", service_config.impl_version);
|
||||
log::info!("❤️ by {}, {}", config.author, config.copyright_start_year);
|
||||
log::info!(
|
||||
"📋 Chain specification: {}",
|
||||
service_config.chain_spec.name()
|
||||
);
|
||||
log::info!("🏷 Node name: {}", service_config.network.node_name);
|
||||
log::info!("👤 Role: {:?}", service_config.role);
|
||||
|
||||
// Create the service. This is the most heavy initialization step.
|
||||
let mut service = (config.builder)(service_config)?;
|
||||
|
||||
// Spawn background task.
|
||||
let session = RpcSession::new(to_front.clone());
|
||||
let mut from_front = from_front.compat();
|
||||
task::spawn(poll_fn(move |cx| {
|
||||
loop {
|
||||
match Pin::new(&mut from_front).poll_next(cx) {
|
||||
Poll::Ready(Some(message)) => {
|
||||
let mut to_front = to_front.clone().sink_compat();
|
||||
let message = message
|
||||
.expect("v1 streams require an error type; Stream of String can't fail; qed");
|
||||
let fut = service.rpc_query(&session, &message);
|
||||
task::spawn(async move {
|
||||
if let Some(response) = fut.await {
|
||||
to_front.send(response).await.ok();
|
||||
}
|
||||
});
|
||||
impl From<Role> for sc_service::Role {
|
||||
fn from(role: Role) -> Self {
|
||||
match role {
|
||||
Role::Light => Self::Light,
|
||||
Role::Authority(_) => {
|
||||
Self::Authority {
|
||||
sentry_nodes: Default::default(),
|
||||
}
|
||||
Poll::Pending => break,
|
||||
Poll::Ready(None) => return Poll::Ready(()),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
match Pin::new(&mut service).poll(cx) {
|
||||
Poll::Ready(Ok(())) => return Poll::Ready(()),
|
||||
Poll::Pending => return Poll::Pending,
|
||||
Poll::Ready(Err(e)) => log::error!("{}", e),
|
||||
}
|
||||
impl From<Role> for Option<String> {
|
||||
fn from(role: Role) -> Self {
|
||||
match role {
|
||||
Role::Light => None,
|
||||
Role::Authority(key) => Some(key.to_seed()),
|
||||
}
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
/// Client configuration.
|
||||
#[derive(Clone)]
|
||||
pub struct SubxtClientConfig<C: ChainSpec + 'static> {
|
||||
/// Name of the implementation.
|
||||
pub impl_name: &'static str,
|
||||
/// Version of the implementation.
|
||||
pub impl_version: &'static str,
|
||||
/// Author of the implementation.
|
||||
pub author: &'static str,
|
||||
/// Copyright start year.
|
||||
pub copyright_start_year: i32,
|
||||
/// Database configuration.
|
||||
pub db: DatabaseConfig,
|
||||
/// Keystore configuration.
|
||||
pub keystore: KeystoreConfig,
|
||||
/// Chain specification.
|
||||
pub chain_spec: C,
|
||||
/// Role of the node.
|
||||
pub role: Role,
|
||||
/// Enable telemetry.
|
||||
pub enable_telemetry: bool,
|
||||
}
|
||||
|
||||
impl<C: ChainSpec + 'static> SubxtClientConfig<C> {
|
||||
/// Creates a service configuration.
|
||||
pub fn to_service_config(self) -> Configuration {
|
||||
let mut network = NetworkConfiguration::new(
|
||||
format!("{} (subxt client)", self.chain_spec.name()),
|
||||
"unknown",
|
||||
Default::default(),
|
||||
None,
|
||||
);
|
||||
network.boot_nodes = self.chain_spec.boot_nodes().to_vec();
|
||||
network.transport = TransportConfig::Normal {
|
||||
enable_mdns: true,
|
||||
allow_private_ipv4: true,
|
||||
wasm_external_transport: None,
|
||||
use_yamux_flow_control: true,
|
||||
};
|
||||
let telemetry_endpoints = if self.enable_telemetry {
|
||||
let endpoints =
|
||||
TelemetryEndpoints::new(vec![("/ip4/127.0.0.1/tcp/99000/ws".into(), 0)])
|
||||
.expect("valid config; qed");
|
||||
Some(endpoints)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let service_config = Configuration {
|
||||
network,
|
||||
impl_name: self.impl_name.to_string(),
|
||||
impl_version: self.impl_version.to_string(),
|
||||
chain_spec: Box::new(self.chain_spec),
|
||||
role: self.role.into(),
|
||||
task_executor: (move |fut, ty| {
|
||||
match ty {
|
||||
TaskType::Async => task::spawn(fut),
|
||||
TaskType::Blocking => task::spawn_blocking(|| task::block_on(fut)),
|
||||
}
|
||||
})
|
||||
.into(),
|
||||
database: self.db,
|
||||
keystore: self.keystore,
|
||||
max_runtime_instances: 8,
|
||||
announce_block: true,
|
||||
dev_key_seed: self.role.into(),
|
||||
telemetry_endpoints,
|
||||
|
||||
telemetry_external_transport: Default::default(),
|
||||
default_heap_pages: Default::default(),
|
||||
disable_grandpa: Default::default(),
|
||||
execution_strategies: Default::default(),
|
||||
force_authoring: Default::default(),
|
||||
offchain_worker: Default::default(),
|
||||
prometheus_config: Default::default(),
|
||||
pruning: Default::default(),
|
||||
rpc_cors: Default::default(),
|
||||
rpc_http: Default::default(),
|
||||
rpc_ipc: Default::default(),
|
||||
rpc_ws: Default::default(),
|
||||
rpc_ws_max_connections: Default::default(),
|
||||
rpc_methods: Default::default(),
|
||||
state_cache_child_ratio: Default::default(),
|
||||
state_cache_size: Default::default(),
|
||||
tracing_receiver: Default::default(),
|
||||
tracing_targets: Default::default(),
|
||||
transaction_pool: Default::default(),
|
||||
wasm_method: Default::default(),
|
||||
base_path: Default::default(),
|
||||
informant_output_format: Default::default(),
|
||||
};
|
||||
|
||||
log::info!("{}", service_config.impl_name);
|
||||
log::info!("✌️ version {}", service_config.impl_version);
|
||||
log::info!("❤️ by {}, {}", self.author, self.copyright_start_year);
|
||||
log::info!(
|
||||
"📋 Chain specification: {}",
|
||||
service_config.chain_spec.name()
|
||||
);
|
||||
log::info!("🏷 Node name: {}", service_config.network.node_name);
|
||||
log::info!("👤 Role: {:?}", self.role);
|
||||
|
||||
service_config
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -347,12 +354,14 @@ mod tests {
|
||||
cache_size: 64,
|
||||
},
|
||||
keystore: KeystoreConfig::InMemory,
|
||||
builder: test_node::service::new_light,
|
||||
chain_spec,
|
||||
role: Role::Light,
|
||||
enable_telemetry: false,
|
||||
};
|
||||
let client = ClientBuilder::<NodeTemplateRuntime>::new()
|
||||
.set_client(SubxtClient::new(config).unwrap())
|
||||
.set_client(
|
||||
SubxtClient::from_config(config, test_node::service::new_light).unwrap(),
|
||||
)
|
||||
.build()
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -378,12 +387,14 @@ mod tests {
|
||||
cache_size: 128,
|
||||
},
|
||||
keystore: KeystoreConfig::InMemory,
|
||||
builder: test_node::service::new_full,
|
||||
chain_spec: test_node::chain_spec::development_config(),
|
||||
chain_spec: test_node::chain_spec::development_config().unwrap(),
|
||||
role: Role::Authority(AccountKeyring::Alice),
|
||||
enable_telemetry: false,
|
||||
};
|
||||
let client = ClientBuilder::<NodeTemplateRuntime>::new()
|
||||
.set_client(SubxtClient::new(config).unwrap())
|
||||
.set_client(
|
||||
SubxtClient::from_config(config, test_node::service::new_full).unwrap(),
|
||||
)
|
||||
.build()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
Reference in New Issue
Block a user