Allow integrated relay chain light client (#2270)

* Add embedded light client to cli

* Prepare for light-client-worker

* First working version

* Clean up

* Remove unwanted logs

* Simplify subscription code

* Let jsonrpsee handle rpc management

* Simplify implementation

* Reorganize crate structure

* Use relay chain arg chainspec for light-client

* Clean up command line

* Add light client worker file

* Use smoldot master to avoid wasmtime conflict

* Remove sleep

* Improve naming of cli option

* Remove conflict with `validator`

* Improve docs

* Update smoldot, remove unwanted change

* Apply suggestions from code review

Co-authored-by: Dmitry Markin <dmitry@markin.tech>

* Disable collation

* Reviewer comments

* Update smoldot and tokio-platform

* Update smoldot

* Update smoldot

* Adjust to new version

* Patch substrate

* Use constants

* Add readme entry, improve zombienet tests

* Apply suggestions from code review

Co-authored-by: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com>

* Make execution mode an enum

* Update smoldot, remove substrate patch

* Update client/relay-chain-rpc-interface/src/rpc_client.rs

Co-authored-by: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com>

* Reduce duplicate code

* Update smoldot

* Update smoldot

* Fix build

* Update smoldot

* Make platform compile

* Clean up dependencies

* Use crates.io instead of github for smoldot

* Apply suggestions from code review

Co-authored-by: Davide Galassi <davxy@datawok.net>

* Docs

* Improve docs

* Remove `RpcFrontend`

---------

Co-authored-by: Dmitry Markin <dmitry@markin.tech>
Co-authored-by: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com>
Co-authored-by: Davide Galassi <davxy@datawok.net>
This commit is contained in:
Sebastian Kunert
2023-08-22 15:53:32 +02:00
committed by GitHub
parent 5eea3939de
commit 817c4503db
21 changed files with 1405 additions and 384 deletions
@@ -38,9 +38,15 @@ use std::pin::Pin;
use cumulus_primitives_core::relay_chain::BlockId;
pub use url::Url;
mod light_client_worker;
mod reconnecting_ws_client;
mod rpc_client;
pub use rpc_client::{create_client_and_start_worker, RelayChainRpcClient};
mod tokio_platform;
pub use rpc_client::{
create_client_and_start_light_client_worker, create_client_and_start_worker,
RelayChainRpcClient,
};
const TIMEOUT_IN_SECONDS: u64 = 6;
@@ -0,0 +1,297 @@
// Copyright 2023 Parity Technologies (UK) Ltd.
// This file is part of Cumulus.
// Cumulus is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Cumulus is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.
//! This module contains a backend that sends RPC requests to an
//! embedded light client. Even though no networking is involved,
//! we treat the light-client as a normal JsonRPC target.
use futures::{channel::mpsc::Sender, prelude::*, stream::FuturesUnordered};
use jsonrpsee::core::{
client::{
Client as JsonRpseeClient, ClientBuilder, ClientT, ReceivedMessage, TransportReceiverT,
TransportSenderT,
},
Error,
};
use smoldot_light::{ChainId, Client as SmoldotClient, JsonRpcResponses};
use std::{num::NonZeroU32, sync::Arc};
use tokio::sync::mpsc::{channel as tokio_channel, Receiver, Sender as TokioSender};
use cumulus_primitives_core::relay_chain::{
Block as RelayBlock, BlockNumber as RelayNumber, Hash as RelayHash, Header as RelayHeader,
};
use cumulus_relay_chain_interface::{RelayChainError, RelayChainResult};
use sp_runtime::generic::SignedBlock;
use sc_rpc_api::chain::ChainApiClient;
use sc_service::SpawnTaskHandle;
use crate::{rpc_client::RpcDispatcherMessage, tokio_platform::TokioPlatform};
const LOG_TARGET: &str = "rpc-light-client-worker";
const MAX_PENDING_REQUESTS: u32 = 128;
const MAX_SUBSCRIPTIONS: u32 = 64;
#[derive(thiserror::Error, Debug)]
enum LightClientError {
#[error("Error occured while executing smoldot request: {0}")]
SmoldotError(String),
#[error("Nothing returned from json_rpc_responses")]
EmptyResult,
}
/// Sending adapter allowing JsonRpsee to send messages to smoldot
struct SimpleStringSender {
inner: SmoldotClient<TokioPlatform, ()>,
chain_id: ChainId,
}
#[async_trait::async_trait]
impl TransportSenderT for SimpleStringSender {
type Error = LightClientError;
async fn send(&mut self, msg: String) -> Result<(), Self::Error> {
self.inner
.json_rpc_request(msg, self.chain_id)
.map_err(|e| LightClientError::SmoldotError(e.to_string()))
}
}
/// Receiving adapter allowing JsonRpsee to receive messages from smoldot
struct SimpleStringReceiver {
inner: JsonRpcResponses,
}
#[async_trait::async_trait]
impl TransportReceiverT for SimpleStringReceiver {
type Error = LightClientError;
async fn receive(&mut self) -> Result<ReceivedMessage, Self::Error> {
self.inner
.next()
.await
.map(|message| jsonrpsee::core::client::ReceivedMessage::Text(message))
.ok_or(LightClientError::EmptyResult)
}
}
/// Build a smoldot client and add the specified chain spec to it.
pub async fn build_smoldot_client(
spawner: SpawnTaskHandle,
chain_spec: &str,
) -> RelayChainResult<(SmoldotClient<TokioPlatform, ()>, ChainId, JsonRpcResponses)> {
let platform = TokioPlatform::new(spawner);
let mut client = SmoldotClient::new(platform);
// Ask the client to connect to a chain.
let smoldot_light::AddChainSuccess { chain_id, json_rpc_responses } = client
.add_chain(smoldot_light::AddChainConfig {
specification: chain_spec,
json_rpc: smoldot_light::AddChainConfigJsonRpc::Enabled {
max_pending_requests: NonZeroU32::new(MAX_PENDING_REQUESTS)
.expect("Constant larger than 0; qed"),
max_subscriptions: MAX_SUBSCRIPTIONS,
},
potential_relay_chains: core::iter::empty(),
database_content: "",
user_data: (),
})
.map_err(|e| RelayChainError::GenericError(e.to_string()))?;
Ok((client, chain_id, json_rpc_responses.expect("JSON RPC is enabled; qed")))
}
/// Worker to process incoming [`RpcDispatcherMessage`] requests.
/// On startup, this worker opens subscriptions for imported, best and finalized
/// heads. Incoming notifications are distributed to registered listeners.
pub struct LightClientRpcWorker {
client_receiver: Receiver<RpcDispatcherMessage>,
imported_header_listeners: Vec<Sender<RelayHeader>>,
finalized_header_listeners: Vec<Sender<RelayHeader>>,
best_header_listeners: Vec<Sender<RelayHeader>>,
smoldot_client: Arc<JsonRpseeClient>,
}
fn handle_notification(
maybe_header: Option<Result<RelayHeader, Error>>,
senders: &mut Vec<Sender<RelayHeader>>,
) -> Result<(), ()> {
match maybe_header {
Some(Ok(header)) => {
crate::rpc_client::distribute_header(header, senders);
Ok(())
},
None => {
tracing::error!(target: LOG_TARGET, "Subscription closed.");
Err(())
},
Some(Err(error)) => {
tracing::error!(target: LOG_TARGET, ?error, "Error in RPC subscription.");
Err(())
},
}
}
impl LightClientRpcWorker {
/// Create new light-client worker.
///
/// Returns the worker itself and a channel to send messages.
pub fn new(
smoldot_client: smoldot_light::Client<TokioPlatform, ()>,
json_rpc_responses: JsonRpcResponses,
chain_id: ChainId,
) -> (LightClientRpcWorker, TokioSender<RpcDispatcherMessage>) {
let (tx, rx) = tokio_channel(100);
let smoldot_adapter_sender = SimpleStringSender { inner: smoldot_client, chain_id };
let smoldot_adapter_receiver = SimpleStringReceiver { inner: json_rpc_responses };
// Build jsonrpsee client that will talk to the inprocess smoldot node
let smoldot_jsonrpsee_client = Arc::new(
ClientBuilder::default()
.build_with_tokio(smoldot_adapter_sender, smoldot_adapter_receiver),
);
let worker = LightClientRpcWorker {
client_receiver: rx,
imported_header_listeners: Default::default(),
finalized_header_listeners: Default::default(),
best_header_listeners: Default::default(),
smoldot_client: smoldot_jsonrpsee_client,
};
(worker, tx)
}
// Main worker loop.
//
// Does the following:
// 1. Initialize notification streams
// 2. Enter main loop
// a. On listening request, register listener for respective notification stream
// b. On incoming notification, distribute notification to listeners
// c. On incoming request, forward request to smoldot
// d. Advance execution of pending requests
pub async fn run(mut self) {
let mut pending_requests = FuturesUnordered::new();
let Ok(mut new_head_subscription) = <JsonRpseeClient as ChainApiClient<
RelayNumber,
RelayHash,
RelayHeader,
SignedBlock<RelayBlock>,
>>::subscribe_new_heads(&self.smoldot_client)
.await
else {
tracing::error!(
target: LOG_TARGET,
"Unable to initialize new heads subscription"
);
return
};
let Ok(mut finalized_head_subscription) =
<JsonRpseeClient as ChainApiClient<
RelayNumber,
RelayHash,
RelayHeader,
SignedBlock<RelayBlock>,
>>::subscribe_finalized_heads(&self.smoldot_client)
.await
else {
tracing::error!(
target: LOG_TARGET,
"Unable to initialize finalized heads subscription"
);
return
};
let Ok(mut all_head_subscription) = <JsonRpseeClient as ChainApiClient<
RelayNumber,
RelayHash,
RelayHeader,
SignedBlock<RelayBlock>,
>>::subscribe_all_heads(&self.smoldot_client)
.await
else {
tracing::error!(
target: LOG_TARGET,
"Unable to initialize all heads subscription"
);
return
};
loop {
tokio::select! {
evt = self.client_receiver.recv() => match evt {
Some(RpcDispatcherMessage::RegisterBestHeadListener(tx)) => {
self.best_header_listeners.push(tx);
},
Some(RpcDispatcherMessage::RegisterImportListener(tx)) => {
self.imported_header_listeners.push(tx)
},
Some(RpcDispatcherMessage::RegisterFinalizationListener(tx)) => {
self.finalized_header_listeners.push(tx)
},
Some(RpcDispatcherMessage::Request(method, params, response_sender)) => {
let closure_client = self.smoldot_client.clone();
tracing::debug!(
target: LOG_TARGET,
len = pending_requests.len(),
method,
"Request"
);
pending_requests.push(async move {
let response = closure_client.request(&method, params).await;
tracing::debug!(
target: LOG_TARGET,
method,
?response,
"Response"
);
if let Err(err) = response_sender.send(response) {
tracing::debug!(
target: LOG_TARGET,
?err,
"Recipient no longer interested in request result"
);
};
});
},
None => {
tracing::error!(target: LOG_TARGET, "RPC client receiver closed. Stopping RPC Worker.");
return;
}
},
_ = pending_requests.next(), if !pending_requests.is_empty() => {},
import_event = all_head_subscription.next() => {
if handle_notification(import_event, &mut self.imported_header_listeners).is_err() {
return
}
},
best_header_event = new_head_subscription.next() => {
if handle_notification(best_header_event, &mut self.best_header_listeners).is_err() {
return
}
}
finalized_event = finalized_head_subscription.next() => {
if handle_notification(finalized_event, &mut self.finalized_header_listeners).is_err() {
return
}
}
}
}
}
}
@@ -15,53 +15,47 @@
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.
use cumulus_primitives_core::relay_chain::{
BlockNumber as RelayBlockNumber, Header as RelayHeader,
Block as RelayBlock, BlockNumber as RelayNumber, Hash as RelayHash, Header as RelayHeader,
};
use cumulus_relay_chain_interface::{RelayChainError, RelayChainResult};
use futures::{
channel::{
mpsc::{Receiver, Sender},
oneshot::Sender as OneshotSender,
},
channel::{mpsc::Sender, oneshot::Sender as OneshotSender},
future::BoxFuture,
stream::FuturesUnordered,
FutureExt, StreamExt,
};
use jsonrpsee::{
core::{
client::{Client as JsonRpcClient, ClientT, Subscription, SubscriptionClientT},
client::{Client as JsonRpcClient, ClientT, Subscription},
params::ArrayParams,
Error as JsonRpseeError, JsonValue,
},
rpc_params,
ws_client::WsClientBuilder,
};
use lru::LruCache;
use sc_service::TaskManager;
use sc_rpc_api::chain::ChainApiClient;
use sp_runtime::generic::SignedBlock;
use std::{num::NonZeroUsize, sync::Arc};
use tokio::sync::mpsc::{
channel as tokio_channel, Receiver as TokioReceiver, Sender as TokioSender,
};
use url::Url;
const NOTIFICATION_CHANNEL_SIZE_LIMIT: usize = 20;
use crate::rpc_client::{distribute_header, RpcDispatcherMessage};
const LOG_TARGET: &str = "reconnecting-websocket-client";
/// Messages for communication between [`ReconnectingWsClient`] and [`ReconnectingWebsocketWorker`].
#[derive(Debug)]
enum RpcDispatcherMessage {
RegisterBestHeadListener(Sender<RelayHeader>),
RegisterImportListener(Sender<RelayHeader>),
RegisterFinalizationListener(Sender<RelayHeader>),
Request(String, ArrayParams, OneshotSender<Result<JsonValue, JsonRpseeError>>),
}
/// Worker that should be used in combination with [`RelayChainRpcClient`].
///
/// Must be polled to distribute header notifications to listeners.
pub struct ReconnectingWebsocketWorker {
ws_urls: Vec<String>,
/// Communication channel with the RPC client
client_receiver: TokioReceiver<RpcDispatcherMessage>,
/// Frontend for performing websocket requests.
/// Requests and stream requests are forwarded to [`ReconnectingWebsocketWorker`].
#[derive(Debug, Clone)]
pub struct ReconnectingWsClient {
/// Channel to communicate with the RPC worker
to_worker_channel: TokioSender<RpcDispatcherMessage>,
/// Senders to distribute incoming header notifications to
imported_header_listeners: Vec<Sender<RelayHeader>>,
finalized_header_listeners: Vec<Sender<RelayHeader>>,
best_header_listeners: Vec<Sender<RelayHeader>>,
}
/// Format url and force addition of a port
@@ -83,115 +77,6 @@ fn url_to_string_with_port(url: Url) -> Option<String> {
))
}
impl ReconnectingWsClient {
/// Create a new websocket client frontend.
pub async fn new(urls: Vec<Url>, task_manager: &mut TaskManager) -> RelayChainResult<Self> {
tracing::debug!(target: LOG_TARGET, "Instantiating reconnecting websocket client");
let (worker, sender) = ReconnectingWebsocketWorker::new(urls).await;
task_manager
.spawn_essential_handle()
.spawn("relay-chain-rpc-worker", None, worker.run());
Ok(Self { to_worker_channel: sender })
}
}
impl ReconnectingWsClient {
/// Perform a request via websocket connection.
pub async fn request<R>(&self, method: &str, params: ArrayParams) -> Result<R, RelayChainError>
where
R: serde::de::DeserializeOwned,
{
let (tx, rx) = futures::channel::oneshot::channel();
let message = RpcDispatcherMessage::Request(method.into(), params, tx);
self.to_worker_channel.send(message).await.map_err(|err| {
RelayChainError::WorkerCommunicationError(format!(
"Unable to send message to RPC worker: {}",
err
))
})?;
let value = rx.await.map_err(|err| {
RelayChainError::WorkerCommunicationError(format!(
"Unexpected channel close on RPC worker side: {}",
err
))
})??;
serde_json::from_value(value)
.map_err(|_| RelayChainError::GenericError("Unable to deserialize value".to_string()))
}
/// Get a stream of new best relay chain headers
pub fn get_best_heads_stream(&self) -> Result<Receiver<RelayHeader>, RelayChainError> {
let (tx, rx) =
futures::channel::mpsc::channel::<RelayHeader>(NOTIFICATION_CHANNEL_SIZE_LIMIT);
self.send_register_message_to_worker(RpcDispatcherMessage::RegisterBestHeadListener(tx))?;
Ok(rx)
}
/// Get a stream of finalized relay chain headers
pub fn get_finalized_heads_stream(&self) -> Result<Receiver<RelayHeader>, RelayChainError> {
let (tx, rx) =
futures::channel::mpsc::channel::<RelayHeader>(NOTIFICATION_CHANNEL_SIZE_LIMIT);
self.send_register_message_to_worker(RpcDispatcherMessage::RegisterFinalizationListener(
tx,
))?;
Ok(rx)
}
/// Get a stream of all imported relay chain headers
pub fn get_imported_heads_stream(&self) -> Result<Receiver<RelayHeader>, RelayChainError> {
let (tx, rx) =
futures::channel::mpsc::channel::<RelayHeader>(NOTIFICATION_CHANNEL_SIZE_LIMIT);
self.send_register_message_to_worker(RpcDispatcherMessage::RegisterImportListener(tx))?;
Ok(rx)
}
fn send_register_message_to_worker(
&self,
message: RpcDispatcherMessage,
) -> Result<(), RelayChainError> {
self.to_worker_channel
.try_send(message)
.map_err(|e| RelayChainError::WorkerCommunicationError(e.to_string()))
}
}
/// Worker that should be used in combination with [`crate::RelayChainRpcClient`].
///
/// Must be polled to distribute header notifications to listeners.
struct ReconnectingWebsocketWorker {
ws_urls: Vec<String>,
/// Communication channel with the RPC client
client_receiver: TokioReceiver<RpcDispatcherMessage>,
/// Senders to distribute incoming header notifications to
imported_header_listeners: Vec<Sender<RelayHeader>>,
finalized_header_listeners: Vec<Sender<RelayHeader>>,
best_header_listeners: Vec<Sender<RelayHeader>>,
}
fn distribute_header(header: RelayHeader, senders: &mut Vec<Sender<RelayHeader>>) {
senders.retain_mut(|e| {
match e.try_send(header.clone()) {
// Receiver has been dropped, remove Sender from list.
Err(error) if error.is_disconnected() => false,
// Channel is full. This should not happen.
// TODO: Improve error handling here
// https://github.com/paritytech/cumulus/issues/1482
Err(error) => {
tracing::error!(target: LOG_TARGET, ?error, "Event distribution channel has reached its limit. This can lead to missed notifications.");
true
},
_ => true,
}
});
}
/// Manages the active websocket client.
/// Responsible for creating request futures, subscription streams
/// and reconnections.
@@ -248,54 +133,53 @@ impl ClientManager {
}
async fn get_subscriptions(&self) -> Result<RelayChainSubscriptions, JsonRpseeError> {
let import_subscription = self
.active_client
.subscribe::<RelayHeader, _>(
"chain_subscribeAllHeads",
rpc_params![],
"chain_unsubscribeAllHeads",
)
.await
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
?e,
"Unable to open `chain_subscribeAllHeads` subscription."
);
e
})?;
let best_subscription = self
.active_client
.subscribe::<RelayHeader, _>(
"chain_subscribeNewHeads",
rpc_params![],
"chain_unsubscribeNewHeads",
)
.await
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
?e,
"Unable to open `chain_subscribeNewHeads` subscription."
);
e
})?;
let finalized_subscription = self
.active_client
.subscribe::<RelayHeader, _>(
"chain_subscribeFinalizedHeads",
rpc_params![],
"chain_unsubscribeFinalizedHeads",
)
.await
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
?e,
"Unable to open `chain_subscribeFinalizedHeads` subscription."
);
e
})?;
let import_subscription = <JsonRpcClient as ChainApiClient<
RelayNumber,
RelayHash,
RelayHeader,
SignedBlock<RelayBlock>,
>>::subscribe_all_heads(&self.active_client)
.await
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
?e,
"Unable to open `chain_subscribeAllHeads` subscription."
);
e
})?;
let best_subscription = <JsonRpcClient as ChainApiClient<
RelayNumber,
RelayHash,
RelayHeader,
SignedBlock<RelayBlock>,
>>::subscribe_new_heads(&self.active_client)
.await
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
?e,
"Unable to open `chain_subscribeNewHeads` subscription."
);
e
})?;
let finalized_subscription = <JsonRpcClient as ChainApiClient<
RelayNumber,
RelayHash,
RelayHeader,
SignedBlock<RelayBlock>,
>>::subscribe_finalized_heads(&self.active_client)
.await
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
?e,
"Unable to open `chain_subscribeFinalizedHeads` subscription."
);
e
})?;
Ok(RelayChainSubscriptions {
import_subscription,
@@ -344,7 +228,7 @@ enum ConnectionStatus {
impl ReconnectingWebsocketWorker {
/// Create new worker. Returns the worker and a channel to register new listeners.
async fn new(
pub async fn new(
urls: Vec<Url>,
) -> (ReconnectingWebsocketWorker, TokioSender<RpcDispatcherMessage>) {
let urls = urls.into_iter().filter_map(url_to_string_with_port).collect();
@@ -410,7 +294,7 @@ impl ReconnectingWebsocketWorker {
/// the sender from the list.
/// - Find a new valid RPC server to connect to in case the websocket connection is terminated.
/// If the worker is not able to connec to an RPC server from the list, the worker shuts down.
async fn run(mut self) {
pub async fn run(mut self) {
let mut pending_requests = FuturesUnordered::new();
let urls = std::mem::take(&mut self.ws_urls);
@@ -426,7 +310,7 @@ impl ReconnectingWebsocketWorker {
let mut imported_blocks_cache =
LruCache::new(NonZeroUsize::new(40).expect("40 is nonzero; qed."));
let mut should_reconnect = ConnectionStatus::Connected;
let mut last_seen_finalized_num: RelayBlockNumber = 0;
let mut last_seen_finalized_num: RelayNumber = 0;
loop {
// This branch is taken if the websocket connection to the current RPC server is closed.
if let ConnectionStatus::ReconnectRequired(maybe_failed_request) = should_reconnect {
@@ -14,19 +14,19 @@
// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.
use futures::channel::mpsc::Receiver;
use jsonrpsee::{core::params::ArrayParams, rpc_params};
use parity_scale_codec::{Decode, Encode};
use futures::channel::{
mpsc::{Receiver, Sender},
oneshot::Sender as OneshotSender,
};
use jsonrpsee::{
core::{params::ArrayParams, Error as JsonRpseeError},
rpc_params,
};
use serde::de::DeserializeOwned;
pub use url::Url;
use serde_json::Value as JsonValue;
use tokio::sync::mpsc::Sender as TokioSender;
use sc_client_api::StorageData;
use sc_rpc_api::{state::ReadProof, system::Health};
use sc_service::TaskManager;
use sp_api::RuntimeVersion;
use sp_consensus_babe::Epoch;
use sp_core::sp_std::collections::btree_map::BTreeMap;
use sp_storage::StorageKey;
use parity_scale_codec::{Decode, Encode};
use cumulus_primitives_core::{
relay_chain::{
@@ -42,35 +42,96 @@ use cumulus_primitives_core::{
};
use cumulus_relay_chain_interface::{RelayChainError, RelayChainResult};
use crate::reconnecting_ws_client::ReconnectingWsClient;
use sc_client_api::StorageData;
use sc_rpc_api::{state::ReadProof, system::Health};
use sc_service::TaskManager;
use sp_api::RuntimeVersion;
use sp_consensus_babe::Epoch;
use sp_core::sp_std::collections::btree_map::BTreeMap;
use sp_storage::StorageKey;
use crate::{
light_client_worker::{build_smoldot_client, LightClientRpcWorker},
reconnecting_ws_client::ReconnectingWebsocketWorker,
};
pub use url::Url;
const LOG_TARGET: &str = "relay-chain-rpc-client";
const NOTIFICATION_CHANNEL_SIZE_LIMIT: usize = 20;
/// Client that maps RPC methods and deserializes results
#[derive(Clone)]
pub struct RelayChainRpcClient {
/// Websocket client to make calls
ws_client: ReconnectingWsClient,
/// Messages for communication between [`RelayChainRpcClient`] and the RPC workers.
#[derive(Debug)]
pub enum RpcDispatcherMessage {
/// Register new listener for the best headers stream. Contains a sender which will be used
/// to send incoming headers.
RegisterBestHeadListener(Sender<RelayHeader>),
/// Register new listener for the import headers stream. Contains a sender which will be used
/// to send incoming headers.
RegisterImportListener(Sender<RelayHeader>),
/// Register new listener for the finalized headers stream. Contains a sender which will be
/// used to send incoming headers.
RegisterFinalizationListener(Sender<RelayHeader>),
/// Register new listener for the finalized headers stream.
/// Contains the following:
/// - [`String`] representing the RPC method to be called
/// - [`ArrayParams`] for the parameters to the RPC call
/// - [`OneshotSender`] for the return value of the request
Request(String, ArrayParams, OneshotSender<Result<JsonValue, JsonRpseeError>>),
}
/// Entry point to create [`RelayChainRpcClient`] and start a worker that distributes notifications.
/// Entry point to create [`RelayChainRpcClient`] and start a worker that communicates
/// to JsonRPC servers over the network.
pub async fn create_client_and_start_worker(
urls: Vec<Url>,
task_manager: &mut TaskManager,
) -> RelayChainResult<RelayChainRpcClient> {
let ws_client = ReconnectingWsClient::new(urls, task_manager).await?;
let (worker, sender) = ReconnectingWebsocketWorker::new(urls).await;
let client = RelayChainRpcClient::new(ws_client).await?;
task_manager
.spawn_essential_handle()
.spawn("relay-chain-rpc-worker", None, worker.run());
let client = RelayChainRpcClient::new(sender);
Ok(client)
}
/// Entry point to create [`RelayChainRpcClient`] and start a worker that communicates
/// with an embedded smoldot instance.
pub async fn create_client_and_start_light_client_worker(
chain_spec: String,
task_manager: &mut TaskManager,
) -> RelayChainResult<RelayChainRpcClient> {
let (client, chain_id, json_rpc_responses) =
build_smoldot_client(task_manager.spawn_handle(), &chain_spec).await?;
let (worker, sender) = LightClientRpcWorker::new(client, json_rpc_responses, chain_id);
task_manager
.spawn_essential_handle()
.spawn("relay-light-client-worker", None, worker.run());
let client = RelayChainRpcClient::new(sender);
Ok(client)
}
/// Client that maps RPC methods and deserializes results
#[derive(Clone)]
pub struct RelayChainRpcClient {
/// Sender to send messages to the worker.
worker_channel: TokioSender<RpcDispatcherMessage>,
}
impl RelayChainRpcClient {
/// Initialize new RPC Client.
async fn new(ws_client: ReconnectingWsClient) -> RelayChainResult<Self> {
let client = RelayChainRpcClient { ws_client };
Ok(client)
///
/// This client expects a channel connected to a worker that processes
/// requests sent via this channel.
pub(crate) fn new(worker_channel: TokioSender<RpcDispatcherMessage>) -> Self {
RelayChainRpcClient { worker_channel }
}
/// Call a call to `state_call` rpc method.
@@ -129,8 +190,25 @@ impl RelayChainRpcClient {
R: DeserializeOwned + std::fmt::Debug,
OR: Fn(&RelayChainError),
{
self.ws_client.request(method, params).await.map_err(|err| {
trace_error(&err);
let (tx, rx) = futures::channel::oneshot::channel();
let message = RpcDispatcherMessage::Request(method.into(), params, tx);
self.worker_channel.send(message).await.map_err(|err| {
RelayChainError::WorkerCommunicationError(format!(
"Unable to send message to RPC worker: {}",
err
))
})?;
let value = rx.await.map_err(|err| {
RelayChainError::WorkerCommunicationError(format!(
"Unexpected channel close on RPC worker side: {}",
err
))
})??;
serde_json::from_value(value).map_err(|_| {
trace_error(&RelayChainError::GenericError("Unable to deserialize value".to_string()));
RelayChainError::RpcCallError(method.to_string())
})
}
@@ -537,18 +615,57 @@ impl RelayChainRpcClient {
.await
}
fn send_register_message_to_worker(
&self,
message: RpcDispatcherMessage,
) -> Result<(), RelayChainError> {
self.worker_channel
.try_send(message)
.map_err(|e| RelayChainError::WorkerCommunicationError(e.to_string()))
}
/// Get a stream of all imported relay chain headers
pub fn get_imported_heads_stream(&self) -> Result<Receiver<RelayHeader>, RelayChainError> {
self.ws_client.get_imported_heads_stream()
let (tx, rx) =
futures::channel::mpsc::channel::<RelayHeader>(NOTIFICATION_CHANNEL_SIZE_LIMIT);
self.send_register_message_to_worker(RpcDispatcherMessage::RegisterImportListener(tx))?;
Ok(rx)
}
/// Get a stream of new best relay chain headers
pub fn get_best_heads_stream(&self) -> Result<Receiver<RelayHeader>, RelayChainError> {
self.ws_client.get_best_heads_stream()
let (tx, rx) =
futures::channel::mpsc::channel::<RelayHeader>(NOTIFICATION_CHANNEL_SIZE_LIMIT);
self.send_register_message_to_worker(RpcDispatcherMessage::RegisterBestHeadListener(tx))?;
Ok(rx)
}
/// Get a stream of finalized relay chain headers
pub fn get_finalized_heads_stream(&self) -> Result<Receiver<RelayHeader>, RelayChainError> {
self.ws_client.get_finalized_heads_stream()
let (tx, rx) =
futures::channel::mpsc::channel::<RelayHeader>(NOTIFICATION_CHANNEL_SIZE_LIMIT);
self.send_register_message_to_worker(RpcDispatcherMessage::RegisterFinalizationListener(
tx,
))?;
Ok(rx)
}
}
/// Send `header` through all channels contained in `senders`.
/// If no one is listening to the sender, it is removed from the vector.
pub fn distribute_header(header: RelayHeader, senders: &mut Vec<Sender<RelayHeader>>) {
senders.retain_mut(|e| {
match e.try_send(header.clone()) {
// Receiver has been dropped, remove Sender from list.
Err(error) if error.is_disconnected() => false,
// Channel is full. This should not happen.
// TODO: Improve error handling here
// https://github.com/paritytech/cumulus/issues/1482
Err(error) => {
tracing::error!(target: LOG_TARGET, ?error, "Event distribution channel has reached its limit. This can lead to missed notifications.");
true
},
_ => true,
}
});
}
@@ -0,0 +1,211 @@
// Copyright 2023 Parity Technologies (UK) Ltd.
// This file is part of Cumulus.
// Cumulus is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Cumulus is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.
use core::time::Duration;
use futures::prelude::*;
use sc_service::SpawnTaskHandle;
use smoldot::libp2p::{websocket, with_buffers};
use smoldot_light::platform::{
Address, ConnectError, ConnectionType, IpAddr, MultiStreamWebRtcConnection, PlatformRef,
SubstreamDirection,
};
use std::{net::SocketAddr, pin::Pin, time::Instant};
use tokio::net::TcpStream;
use tokio_util::compat::{Compat, TokioAsyncReadCompatExt};
type CompatTcpStream = Compat<TcpStream>;
/// Platform implementation for tokio
/// This implementation is a port of the implementation for smol:
/// https://github.com/smol-dot/smoldot/blob/8c577b4a753fe96190f813070564ecc742b91a16/light-base/src/platform/default.rs
#[derive(Clone)]
pub struct TokioPlatform {
spawner: SpawnTaskHandle,
}
impl TokioPlatform {
pub fn new(spawner: SpawnTaskHandle) -> Self {
TokioPlatform { spawner }
}
}
impl PlatformRef for TokioPlatform {
type Delay = future::BoxFuture<'static, ()>;
type Instant = Instant;
type MultiStream = std::convert::Infallible;
type Stream = Stream;
type StreamConnectFuture = future::BoxFuture<'static, Result<Self::Stream, ConnectError>>;
type MultiStreamConnectFuture = future::BoxFuture<
'static,
Result<MultiStreamWebRtcConnection<Self::MultiStream>, ConnectError>,
>;
type ReadWriteAccess<'a> = with_buffers::ReadWriteAccess<'a>;
type StreamUpdateFuture<'a> = future::BoxFuture<'a, ()>;
type StreamErrorRef<'a> = &'a std::io::Error;
type NextSubstreamFuture<'a> = future::Pending<Option<(Self::Stream, SubstreamDirection)>>;
fn now_from_unix_epoch(&self) -> Duration {
// Intentionally panic if the time is configured earlier than the UNIX EPOCH.
std::time::UNIX_EPOCH.elapsed().unwrap()
}
fn now(&self) -> Self::Instant {
Instant::now()
}
fn fill_random_bytes(&self, buffer: &mut [u8]) {
rand::RngCore::fill_bytes(&mut rand::thread_rng(), buffer);
}
fn sleep(&self, duration: Duration) -> Self::Delay {
tokio::time::sleep(duration).boxed()
}
fn sleep_until(&self, when: Self::Instant) -> Self::Delay {
let duration = when.saturating_duration_since(Instant::now());
self.sleep(duration)
}
fn supports_connection_type(&self, connection_type: ConnectionType) -> bool {
matches!(
connection_type,
ConnectionType::TcpIpv4 |
ConnectionType::TcpIpv6 |
ConnectionType::TcpDns |
ConnectionType::WebSocketIpv4 { .. } |
ConnectionType::WebSocketIpv6 { .. } |
ConnectionType::WebSocketDns { secure: false, .. }
)
}
fn connect_stream(&self, multiaddr: Address) -> Self::StreamConnectFuture {
let (tcp_socket_addr, host_if_websocket): (
either::Either<SocketAddr, (String, u16)>,
Option<String>,
) = match multiaddr {
Address::TcpDns { hostname, port } =>
(either::Right((hostname.to_string(), port)), None),
Address::TcpIp { ip: IpAddr::V4(ip), port } =>
(either::Left(SocketAddr::from((ip, port))), None),
Address::TcpIp { ip: IpAddr::V6(ip), port } =>
(either::Left(SocketAddr::from((ip, port))), None),
Address::WebSocketDns { hostname, port, secure: false } => (
either::Right((hostname.to_string(), port)),
Some(format!("{}:{}", hostname, port)),
),
Address::WebSocketIp { ip: IpAddr::V4(ip), port } => {
let addr = SocketAddr::from((ip, port));
(either::Left(addr), Some(addr.to_string()))
},
Address::WebSocketIp { ip: IpAddr::V6(ip), port } => {
let addr = SocketAddr::from((ip, port));
(either::Left(addr), Some(addr.to_string()))
},
// The API user of the `PlatformRef` trait is never supposed to open connections of
// a type that isn't supported.
_ => unreachable!(),
};
Box::pin(async move {
let tcp_socket = match tcp_socket_addr {
either::Left(socket_addr) => TcpStream::connect(socket_addr).await,
either::Right((dns, port)) => TcpStream::connect((&dns[..], port)).await,
};
if let Ok(tcp_socket) = &tcp_socket {
let _ = tcp_socket.set_nodelay(true);
}
let socket: TcpOrWs = match (tcp_socket, host_if_websocket) {
(Ok(tcp_socket), Some(host)) => future::Either::Right(
websocket::websocket_client_handshake(websocket::Config {
tcp_socket: tcp_socket.compat(),
host: &host,
url: "/",
})
.await
.map_err(|err| ConnectError {
message: format!("Failed to negotiate WebSocket: {err}"),
})?,
),
(Ok(tcp_socket), None) => future::Either::Left(tcp_socket.compat()),
(Err(err), _) =>
return Err(ConnectError { message: format!("Failed to reach peer: {err}") }),
};
Ok(Stream(with_buffers::WithBuffers::new(socket)))
})
}
fn open_out_substream(&self, _c: &mut Self::MultiStream) {
// This function can only be called with so-called "multi-stream" connections. We never
// open such connection.
}
fn next_substream<'a>(&self, c: &'a mut Self::MultiStream) -> Self::NextSubstreamFuture<'a> {
// This function can only be called with so-called "multi-stream" connections. We never
// open such connection.
match *c {}
}
fn spawn_task(
&self,
_: std::borrow::Cow<str>,
task: impl Future<Output = ()> + Send + 'static,
) {
self.spawner.spawn("cumulus-internal-light-client-task", None, task)
}
fn client_name(&self) -> std::borrow::Cow<str> {
"cumulus-relay-chain-light-client".into()
}
fn client_version(&self) -> std::borrow::Cow<str> {
env!("CARGO_PKG_VERSION").into()
}
fn connect_multistream(
&self,
_address: smoldot_light::platform::MultiStreamAddress,
) -> Self::MultiStreamConnectFuture {
unimplemented!("Multistream not supported!")
}
fn read_write_access<'a>(
&self,
stream: Pin<&'a mut Self::Stream>,
) -> Result<Self::ReadWriteAccess<'a>, &'a std::io::Error> {
let stream = stream.project();
stream.0.read_write_access(Instant::now())
}
fn wait_read_write_again<'a>(
&self,
stream: Pin<&'a mut Self::Stream>,
) -> Self::StreamUpdateFuture<'a> {
let stream = stream.project();
Box::pin(stream.0.wait_read_write_again(|when| async move {
tokio::time::sleep_until(when.into()).await;
}))
}
}
type TcpOrWs = future::Either<CompatTcpStream, websocket::Connection<CompatTcpStream>>;
/// Implementation detail of [`TokioPlatform`].
#[pin_project::pin_project]
pub struct Stream(#[pin] with_buffers::WithBuffers<TcpOrWs>);