mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-31 04:01:02 +00:00
Move substrate-bridge-relay into repository (#1)
* Initial commit. CLI which parses RPC urls. * Establish ws connections and make simple RPC requests. * Complete bridge setup. * Process subscription events. * Ctrl-C handler. * Write a bare-bones README and copy in design doc. * Modularize code a little bit. * Communicate with each chain in a separate task. * Parse headers from RPC subscription notifications. * Send (fake) extrinsics across bridge channels. And now it's deadlocked. * Fix deadlock. * Clarify in README that this is not-in-progress. * Move everything into a single folder * Move Substrate relay into appropriate folder * Get the Substrate Relay node compiling * Update Cargo.lock * Use new composite accounts from Substrate * Remove specification document It has been moved to the Wiki on the Github repo. * Update author + remove comments * Use latest master for jsonrpsee Required renaming some stuff (e.g Client -> RawClient) Co-authored-by: Jim Posen <jim.posen@gmail.com>
This commit is contained in:
committed by
Bastian Köcher
parent
7ef276daba
commit
ebbc4724d0
@@ -247,6 +247,9 @@ mod tests {
|
|||||||
type MaximumBlockLength = ();
|
type MaximumBlockLength = ();
|
||||||
type Version = ();
|
type Version = ();
|
||||||
type ModuleToIndex = ();
|
type ModuleToIndex = ();
|
||||||
|
type AccountData = ();
|
||||||
|
type OnNewAccount = ();
|
||||||
|
type OnReapAccount = ();
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Trait for Test {}
|
impl Trait for Test {}
|
||||||
|
|||||||
@@ -0,0 +1,20 @@
|
|||||||
|
[package]
|
||||||
|
name = "substrate-bridge"
|
||||||
|
version = "0.1.0"
|
||||||
|
authors = ["Parity Technologies <admin@parity.io>"]
|
||||||
|
edition = "2018"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
async-std = "1.0.1"
|
||||||
|
clap = "2.3.3"
|
||||||
|
ctrlc = "3.1.3"
|
||||||
|
derive_more = "0.99.1"
|
||||||
|
env_logger = "0.7.1"
|
||||||
|
futures = "0.3.1"
|
||||||
|
jsonrpsee = { git = "https://github.com/paritytech/jsonrpsee", features = ["ws"] }
|
||||||
|
log = "0.4.8"
|
||||||
|
node-primitives = { version = "2.0.0", git = "https://github.com/paritytech/substrate" }
|
||||||
|
serde_json = "1.0.41"
|
||||||
|
sp-core = { version = "2.0.0", git = "https://github.com/paritytech/substrate.git" }
|
||||||
|
sp-rpc = { version = "2.0.0", git = "https://github.com/paritytech/substrate.git" }
|
||||||
|
url = "2.1.0"
|
||||||
@@ -0,0 +1,32 @@
|
|||||||
|
# Substrate-to-Substrate Bridge Relay
|
||||||
|
|
||||||
|
The bridge relay is a process that connects to running Substrate nodes and sends data over the Substrate-to-Substrate bridge. The process communicates with the nodes over the JSON-RPC interface and reads data from the relays information required by the `bridge` pallet using runtime calls and writes data to the modules by constructing and submitting extrinsics.
|
||||||
|
|
||||||
|
For more details, see the [design document](doc/design.md).
|
||||||
|
|
||||||
|
## Status
|
||||||
|
|
||||||
|
This is a not-in-progress prototype.
|
||||||
|
|
||||||
|
## Running in development
|
||||||
|
|
||||||
|
Run two development Substrate chains:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
> TMPDIR=(mktemp -d)
|
||||||
|
> cd $TMPDIR
|
||||||
|
> substrate build-spec --dev > red-spec.json
|
||||||
|
> cp red-spec.json blue-spec.json
|
||||||
|
# Modify the chain spec in an editor so that the genesis hashes of the two chains differ.
|
||||||
|
# For example, double one of the balances in '$.genesis.runtime.balances.balances'.
|
||||||
|
> substrate --chain red-spec.json --alice --base-path ./red --port 30343 --ws-port 9954
|
||||||
|
> substrate --chain blue-spec.json --alice --base-path ./blue --port 30353 --ws-port 9964
|
||||||
|
```
|
||||||
|
|
||||||
|
Now run the bridge relay:
|
||||||
|
|
||||||
|
```
|
||||||
|
> target/release/substrate-bridge --base-path ./relay \
|
||||||
|
--rpc-url ws://localhost:9954 \
|
||||||
|
--rpc-url ws://localhost:9964
|
||||||
|
```
|
||||||
@@ -0,0 +1,391 @@
|
|||||||
|
use crate::error::Error;
|
||||||
|
use crate::rpc::{self, SubstrateRPC};
|
||||||
|
use crate::params::{RPCUrlParam, Params};
|
||||||
|
|
||||||
|
use futures::{prelude::*, channel::{mpsc, oneshot}, future, select};
|
||||||
|
use jsonrpsee::{
|
||||||
|
core::client::{RawClientError, RawClientEvent, RawClientRequestId, RawClientSubscription},
|
||||||
|
ws::{WsRawClient, WsConnecError, ws_raw_client},
|
||||||
|
};
|
||||||
|
use node_primitives::{Hash, Header};
|
||||||
|
use std::cell::RefCell;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::pin::Pin;
|
||||||
|
use sp_core::Bytes;
|
||||||
|
|
||||||
|
type ChainId = Hash;
|
||||||
|
|
||||||
|
struct BridgeState {
|
||||||
|
channel: mpsc::Sender<Event>,
|
||||||
|
locally_finalized_head_on_bridged_chain: Header,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct ChainState {
|
||||||
|
current_finalized_head: Header,
|
||||||
|
bridges: HashMap<ChainId, BridgeState>,
|
||||||
|
}
|
||||||
|
|
||||||
|
enum Event {
|
||||||
|
SubmitExtrinsic(Bytes),
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Chain {
|
||||||
|
url: String,
|
||||||
|
client: WsRawClient,
|
||||||
|
sender: mpsc::Sender<Event>,
|
||||||
|
receiver: mpsc::Receiver<Event>,
|
||||||
|
genesis_hash: Hash,
|
||||||
|
state: ChainState,
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn init_rpc_connection(url: &RPCUrlParam) -> Result<Chain, Error> {
|
||||||
|
let url_str = url.to_string();
|
||||||
|
log::debug!("Connecting to {}", url_str);
|
||||||
|
|
||||||
|
// Skip the leading "ws://" and trailing "/".
|
||||||
|
let url_without_scheme = &url_str[5..(url_str.len() - 1)];
|
||||||
|
let mut client = ws_raw_client(url_without_scheme)
|
||||||
|
.await
|
||||||
|
.map_err(|err| Error::WsConnectionError(err.to_string()))?;
|
||||||
|
|
||||||
|
let genesis_hash = rpc::genesis_block_hash(&mut client)
|
||||||
|
.await
|
||||||
|
.map_err(|e| Error::RPCError(e.to_string()))?
|
||||||
|
.ok_or_else(|| Error::InvalidChainState(format!(
|
||||||
|
"chain with RPC URL {} is missing a genesis block hash",
|
||||||
|
url_str,
|
||||||
|
)))?;
|
||||||
|
|
||||||
|
let latest_finalized_hash = SubstrateRPC::chain_finalized_head(&mut client)
|
||||||
|
.await
|
||||||
|
.map_err(|e| Error::RPCError(e.to_string()))?;
|
||||||
|
let latest_finalized_header = SubstrateRPC::chain_header(
|
||||||
|
&mut client,
|
||||||
|
Some(latest_finalized_hash)
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.map_err(|e| Error::RPCError(e.to_string()))?
|
||||||
|
.ok_or_else(|| Error::InvalidChainState(format!(
|
||||||
|
"chain {} is missing header for finalized block hash {}",
|
||||||
|
genesis_hash, latest_finalized_hash
|
||||||
|
)))?;
|
||||||
|
|
||||||
|
let (sender, receiver) = mpsc::channel(0);
|
||||||
|
|
||||||
|
Ok(Chain {
|
||||||
|
url: url_str,
|
||||||
|
client,
|
||||||
|
sender,
|
||||||
|
receiver,
|
||||||
|
genesis_hash,
|
||||||
|
state: ChainState {
|
||||||
|
current_finalized_head: latest_finalized_header,
|
||||||
|
bridges: HashMap::new(),
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns IDs of the bridged chains.
|
||||||
|
async fn read_bridges(chain: &mut Chain, chain_ids: &[Hash])
|
||||||
|
-> Result<Vec<Hash>, Error>
|
||||||
|
{
|
||||||
|
// This should make an RPC call to read this information from the bridge pallet state.
|
||||||
|
// For now, just pretend every chain is bridged to every other chain.
|
||||||
|
//
|
||||||
|
// TODO: The correct thing.
|
||||||
|
Ok(
|
||||||
|
chain_ids
|
||||||
|
.iter()
|
||||||
|
.cloned()
|
||||||
|
.filter(|&chain_id| chain_id != chain.genesis_hash)
|
||||||
|
.collect()
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn run_async(
|
||||||
|
params: Params,
|
||||||
|
exit: Box<dyn Future<Output=()> + Unpin + Send>
|
||||||
|
) -> Result<(), Error>
|
||||||
|
{
|
||||||
|
let chains = init_chains(¶ms).await?;
|
||||||
|
|
||||||
|
let (chain_tasks, exit_signals) = chains.into_iter()
|
||||||
|
.map(|(chain_id, chain_cell)| {
|
||||||
|
let chain = chain_cell.into_inner();
|
||||||
|
let (task_exit_signal, task_exit_receiver) = oneshot::channel();
|
||||||
|
let task_exit = Box::new(task_exit_receiver.map(|result| {
|
||||||
|
result.expect("task_exit_signal is not dropped before send() is called")
|
||||||
|
}));
|
||||||
|
let chain_task = async_std::task::spawn(async move {
|
||||||
|
if let Err(err) = chain_task(chain_id, chain, task_exit).await {
|
||||||
|
log::error!("Error in task for chain {}: {}", chain_id, err);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
(chain_task, task_exit_signal)
|
||||||
|
})
|
||||||
|
.unzip::<_, _, Vec<_>, Vec<_>>();
|
||||||
|
|
||||||
|
async_std::task::spawn(async move {
|
||||||
|
exit.await;
|
||||||
|
for exit_signal in exit_signals {
|
||||||
|
let _ = exit_signal.send(());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
future::join_all(chain_tasks).await;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn initial_next_events<'a>(chains: &'a HashMap<ChainId, RefCell<Chain>>)
|
||||||
|
-> Vec<Pin<Box<dyn Future<Output=Result<(ChainId, RawClientEvent), Error>> + 'a>>>
|
||||||
|
{
|
||||||
|
chains.values()
|
||||||
|
.map(|chain_cell| async move {
|
||||||
|
let mut chain = chain_cell.borrow_mut();
|
||||||
|
let event = chain.client.next_event()
|
||||||
|
.await
|
||||||
|
.map_err(|err| Error::RPCError(err.to_string()))?;
|
||||||
|
Ok((chain.genesis_hash, event))
|
||||||
|
})
|
||||||
|
.map(|fut| Box::pin(fut) as Pin<Box<dyn Future<Output=_>>>)
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn next_event<'a>(
|
||||||
|
next_events: Vec<Pin<Box<dyn Future<Output=Result<(ChainId, RawClientEvent), Error>> + 'a>>>,
|
||||||
|
chains: &'a HashMap<ChainId, RefCell<Chain>>,
|
||||||
|
)
|
||||||
|
-> (
|
||||||
|
Result<(Hash, RawClientEvent), Error>,
|
||||||
|
Vec<Pin<Box<dyn Future<Output=Result<(ChainId, RawClientEvent), Error>> +'a>>>
|
||||||
|
)
|
||||||
|
{
|
||||||
|
let (result, _, mut rest) = future::select_all(next_events).await;
|
||||||
|
|
||||||
|
match result {
|
||||||
|
Ok((chain_id, _)) => {
|
||||||
|
let fut = async move {
|
||||||
|
let chain_cell = chains.get(&chain_id)
|
||||||
|
.expect("chain must be in the map as a function precondition; qed");
|
||||||
|
let mut chain = chain_cell.borrow_mut();
|
||||||
|
let event = chain.client.next_event()
|
||||||
|
.await
|
||||||
|
.map_err(|err| Error::RPCError(err.to_string()))?;
|
||||||
|
Ok((chain_id, event))
|
||||||
|
};
|
||||||
|
rest.push(Box::pin(fut));
|
||||||
|
}
|
||||||
|
Err(ref err) => log::warn!("error in RPC connection with a chain: {}", err),
|
||||||
|
}
|
||||||
|
|
||||||
|
(result, rest)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn init_chains(params: &Params) -> Result<HashMap<ChainId, RefCell<Chain>>, Error> {
|
||||||
|
let chains = future::join_all(params.rpc_urls.iter().map(init_rpc_connection))
|
||||||
|
.await
|
||||||
|
.into_iter()
|
||||||
|
.map(|result| result.map(|chain| (chain.genesis_hash, RefCell::new(chain))))
|
||||||
|
.collect::<Result<HashMap<_, _>, _>>()?;
|
||||||
|
|
||||||
|
// TODO: Remove when read_bridges is implemented correctly.
|
||||||
|
let chain_ids = chains.keys()
|
||||||
|
.cloned()
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
// let chain_ids_slice = chain_ids.as_slice();
|
||||||
|
|
||||||
|
for (&chain_id, chain_cell) in chains.iter() {
|
||||||
|
let mut chain = chain_cell.borrow_mut();
|
||||||
|
for bridged_chain_id in read_bridges(&mut chain, &chain_ids).await? {
|
||||||
|
if chain_id == bridged_chain_id {
|
||||||
|
log::warn!("chain {} has a bridge to itself", chain_id);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(bridged_chain_cell) = chains.get(&bridged_chain_id) {
|
||||||
|
let bridged_chain = bridged_chain_cell.borrow_mut();
|
||||||
|
|
||||||
|
// TODO: Get this from RPC to runtime API.
|
||||||
|
let genesis_head = SubstrateRPC::chain_header(&mut chain.client, chain_id)
|
||||||
|
.await
|
||||||
|
.map_err(|e| Error::RPCError(e.to_string()))?
|
||||||
|
.ok_or_else(|| Error::InvalidChainState(format!(
|
||||||
|
"chain {} is missing a genesis block header", chain_id
|
||||||
|
)))?;
|
||||||
|
|
||||||
|
let channel = chain.sender.clone();
|
||||||
|
chain.state.bridges.insert(bridged_chain_id, BridgeState {
|
||||||
|
channel,
|
||||||
|
locally_finalized_head_on_bridged_chain: genesis_head,
|
||||||
|
});
|
||||||
|
|
||||||
|
// The conditional ensures that we don't log twice per pair of chains.
|
||||||
|
if chain_id.as_ref() < bridged_chain_id.as_ref() {
|
||||||
|
log::info!("initialized bridge between {} and {}", chain_id, bridged_chain_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(chains)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn setup_subscriptions(chain: &mut Chain)
|
||||||
|
-> Result<(RawClientRequestId, RawClientRequestId), RawClientError<WsConnecError>>
|
||||||
|
{
|
||||||
|
let new_heads_subscription_id = chain.client
|
||||||
|
.start_subscription(
|
||||||
|
"chain_subscribeNewHeads",
|
||||||
|
jsonrpsee::core::common::Params::None,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.map_err(RawClientError::Inner)?;
|
||||||
|
|
||||||
|
let finalized_heads_subscription_id = chain.client
|
||||||
|
.start_subscription(
|
||||||
|
"chain_subscribeFinalizedHeads",
|
||||||
|
jsonrpsee::core::common::Params::None,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.map_err(RawClientError::Inner)?;
|
||||||
|
|
||||||
|
let new_heads_subscription =
|
||||||
|
chain.client.subscription_by_id(new_heads_subscription_id)
|
||||||
|
.expect("subscription_id was returned from start_subscription above; qed");
|
||||||
|
let new_heads_subscription = match new_heads_subscription {
|
||||||
|
RawClientSubscription::Active(_) => {}
|
||||||
|
RawClientSubscription::Pending(subscription) => {
|
||||||
|
subscription.wait().await?;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let finalized_heads_subscription =
|
||||||
|
chain.client.subscription_by_id(finalized_heads_subscription_id)
|
||||||
|
.expect("subscription_id was returned from start_subscription above; qed");
|
||||||
|
let finalized_heads_subscription = match finalized_heads_subscription {
|
||||||
|
RawClientSubscription::Active(subscription) => {}
|
||||||
|
RawClientSubscription::Pending(subscription) => {
|
||||||
|
subscription.wait().await?;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok((new_heads_subscription_id, finalized_heads_subscription_id))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_rpc_event(
|
||||||
|
chain_id: ChainId,
|
||||||
|
chain: &mut Chain,
|
||||||
|
event: RawClientEvent,
|
||||||
|
new_heads_subscription_id: RawClientRequestId,
|
||||||
|
finalized_heads_subscription_id: RawClientRequestId,
|
||||||
|
) -> Result<(), Error>
|
||||||
|
{
|
||||||
|
match event {
|
||||||
|
RawClientEvent::SubscriptionNotif { request_id, result } =>
|
||||||
|
if request_id == new_heads_subscription_id {
|
||||||
|
let header: Header = serde_json::from_value(result)
|
||||||
|
.map_err(Error::SerializationError)?;
|
||||||
|
log::info!("Received new head {:?} on chain {}", header, chain_id);
|
||||||
|
} else if request_id == finalized_heads_subscription_id {
|
||||||
|
let header: Header = serde_json::from_value(result)
|
||||||
|
.map_err(Error::SerializationError)?;
|
||||||
|
log::info!("Received finalized head {:?} on chain {}", header, chain_id);
|
||||||
|
|
||||||
|
// let old_finalized_head = chain_state.current_finalized_head;
|
||||||
|
chain.state.current_finalized_head = header;
|
||||||
|
for (bridged_chain_id, bridged_chain) in chain.state.bridges.iter_mut() {
|
||||||
|
if bridged_chain.locally_finalized_head_on_bridged_chain.number <
|
||||||
|
chain.state.current_finalized_head.number {
|
||||||
|
// Craft and submit an extrinsic over RPC
|
||||||
|
log::info!("Sending command to submit extrinsic to chain {}", chain_id);
|
||||||
|
let mut send_event = bridged_chain.channel
|
||||||
|
.send(Event::SubmitExtrinsic(Bytes(Vec::new())))
|
||||||
|
.fuse();
|
||||||
|
|
||||||
|
// Continue processing events from other chain tasks while waiting to send
|
||||||
|
// event to other chain task in order to prevent deadlocks.
|
||||||
|
loop {
|
||||||
|
select! {
|
||||||
|
result = send_event => {
|
||||||
|
result.map_err(Error::ChannelError)?;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
event = chain.receiver.next().fuse() => {
|
||||||
|
let event = event
|
||||||
|
.expect("stream will never close as the chain has an mpsc Sender");
|
||||||
|
handle_bridge_event(chain_id, &mut chain.client, event)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
// TODO: exit
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return Err(Error::RPCError(format!(
|
||||||
|
"unexpected subscription response with request ID {:?}", request_id
|
||||||
|
)));
|
||||||
|
},
|
||||||
|
_ => return Err(Error::RPCError(format!(
|
||||||
|
"unexpected RPC event from chain {}: {:?}", chain_id, event
|
||||||
|
))),
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Let's say this never sends over a channel (ie. cannot block on another task).
|
||||||
|
async fn handle_bridge_event(
|
||||||
|
chain_id: ChainId,
|
||||||
|
rpc_client: &mut WsRawClient,
|
||||||
|
event: Event,
|
||||||
|
) -> Result<(), Error>
|
||||||
|
{
|
||||||
|
match event {
|
||||||
|
Event::SubmitExtrinsic(data) => {
|
||||||
|
log::info!("Submitting extrinsic to chain {}", chain_id);
|
||||||
|
if let Err(err) = SubstrateRPC::author_submit_extrinsic(rpc_client, data).await {
|
||||||
|
log::error!("failed to submit extrinsic: {}", err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn chain_task(
|
||||||
|
chain_id: ChainId,
|
||||||
|
mut chain: Chain,
|
||||||
|
exit: impl Future<Output=()> + Unpin + Send
|
||||||
|
) -> Result<(), Error>
|
||||||
|
{
|
||||||
|
let (new_heads_subscription_id, finalized_heads_subscription_id) =
|
||||||
|
setup_subscriptions(&mut chain)
|
||||||
|
.await
|
||||||
|
.map_err(|e| Error::RPCError(e.to_string()))?;
|
||||||
|
|
||||||
|
let mut exit = exit.fuse();
|
||||||
|
loop {
|
||||||
|
select! {
|
||||||
|
result = chain.client.next_event().fuse() => {
|
||||||
|
let event = result.map_err(|e| Error::RPCError(e.to_string()))?;
|
||||||
|
handle_rpc_event(
|
||||||
|
chain_id,
|
||||||
|
&mut chain,
|
||||||
|
event,
|
||||||
|
new_heads_subscription_id,
|
||||||
|
finalized_heads_subscription_id,
|
||||||
|
).await?;
|
||||||
|
}
|
||||||
|
event = chain.receiver.next().fuse() => {
|
||||||
|
let event = event
|
||||||
|
.expect("stream will never close as the chain has an mpsc Sender");
|
||||||
|
handle_bridge_event(chain_id, &mut chain.client, event)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
_ = exit => {
|
||||||
|
log::debug!("Received exit signal, shutting down task for chain {}", chain_id);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
@@ -0,0 +1,31 @@
|
|||||||
|
use futures::channel::mpsc;
|
||||||
|
|
||||||
|
#[derive(Debug, derive_more::Display)]
|
||||||
|
pub enum Error {
|
||||||
|
#[display(fmt = "invalid RPC URL: {}", _0)]
|
||||||
|
UrlError(String),
|
||||||
|
#[display(fmt = "RPC response indicates invalid chain state: {}", _0)]
|
||||||
|
InvalidChainState(String),
|
||||||
|
#[display(fmt = "could not make RPC call: {}", _0)]
|
||||||
|
RPCError(String),
|
||||||
|
#[display(fmt = "could not connect to RPC URL: {}", _0)]
|
||||||
|
WsConnectionError(String),
|
||||||
|
#[display(fmt = "unexpected client event from RPC URL {}: {:?}", _0, _1)]
|
||||||
|
UnexpectedClientEvent(String, String),
|
||||||
|
#[display(fmt = "serialization error: {}", _0)]
|
||||||
|
SerializationError(serde_json::error::Error),
|
||||||
|
#[display(fmt = "invalid event received from bridged chain: {}", _0)]
|
||||||
|
InvalidBridgeEvent(String),
|
||||||
|
#[display(fmt = "error sending over MPSC channel: {}", _0)]
|
||||||
|
ChannelError(mpsc::SendError),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::error::Error for Error {
|
||||||
|
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
|
||||||
|
match self {
|
||||||
|
Error::SerializationError(err) => Some(err),
|
||||||
|
Error::ChannelError(err) => Some(err),
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,80 @@
|
|||||||
|
mod bridge;
|
||||||
|
mod error;
|
||||||
|
mod params;
|
||||||
|
mod rpc;
|
||||||
|
|
||||||
|
use bridge::run_async;
|
||||||
|
use params::{Params, RPCUrlParam};
|
||||||
|
|
||||||
|
use clap::{App, Arg, value_t, values_t};
|
||||||
|
use futures::{prelude::*, channel};
|
||||||
|
use std::cell::Cell;
|
||||||
|
use std::process;
|
||||||
|
|
||||||
|
fn main() {
|
||||||
|
let params = parse_args();
|
||||||
|
env_logger::init();
|
||||||
|
let exit = setup_exit_handler();
|
||||||
|
|
||||||
|
let result = async_std::task::block_on(async move {
|
||||||
|
run_async(params, exit).await
|
||||||
|
});
|
||||||
|
if let Err(err) = result {
|
||||||
|
log::error!("{}", err);
|
||||||
|
process::exit(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn parse_args() -> Params {
|
||||||
|
let matches = App::new("substrate-bridge")
|
||||||
|
.version("1.0")
|
||||||
|
.author("Parity Technologies")
|
||||||
|
.about("Bridges Substrates, duh")
|
||||||
|
.arg(
|
||||||
|
Arg::with_name("base-path")
|
||||||
|
.long("base-path")
|
||||||
|
.value_name("DIRECTORY")
|
||||||
|
.required(true)
|
||||||
|
.help("Sets the base path")
|
||||||
|
.takes_value(true),
|
||||||
|
)
|
||||||
|
.arg(
|
||||||
|
Arg::with_name("rpc-url")
|
||||||
|
.long("rpc-url")
|
||||||
|
.value_name("HOST[:PORT]")
|
||||||
|
.help("The URL of a bridged Substrate node")
|
||||||
|
.takes_value(true)
|
||||||
|
.multiple(true)
|
||||||
|
)
|
||||||
|
.get_matches();
|
||||||
|
|
||||||
|
let base_path = value_t!(matches, "base-path", String)
|
||||||
|
.unwrap_or_else(|e| e.exit());
|
||||||
|
let rpc_urls = values_t!(matches, "rpc-url", RPCUrlParam)
|
||||||
|
.unwrap_or_else(|e| e.exit());
|
||||||
|
|
||||||
|
Params {
|
||||||
|
base_path,
|
||||||
|
rpc_urls,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn setup_exit_handler() -> Box<dyn Future<Output=()> + Unpin + Send> {
|
||||||
|
let (exit_sender, exit_receiver) = channel::oneshot::channel();
|
||||||
|
let exit_sender = Cell::new(Some(exit_sender));
|
||||||
|
|
||||||
|
ctrlc::set_handler(move || {
|
||||||
|
if let Some(exit_sender) = exit_sender.take() {
|
||||||
|
if let Err(()) = exit_sender.send(()) {
|
||||||
|
log::warn!("failed to send exit signal");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.expect("must be able to set Ctrl-C handler");
|
||||||
|
|
||||||
|
Box::new(exit_receiver.map(|result| {
|
||||||
|
result.expect(
|
||||||
|
"exit_sender cannot be dropped as it is moved into a globally-referenced closure"
|
||||||
|
)
|
||||||
|
}))
|
||||||
|
}
|
||||||
@@ -0,0 +1,75 @@
|
|||||||
|
use crate::error::Error;
|
||||||
|
|
||||||
|
use url::Url;
|
||||||
|
use std::str::FromStr;
|
||||||
|
|
||||||
|
const DEFAULT_WS_PORT: u16 = 9944;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct Params {
|
||||||
|
pub base_path: String,
|
||||||
|
pub rpc_urls: Vec<RPCUrlParam>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct RPCUrlParam {
|
||||||
|
url: Url,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ToString for RPCUrlParam {
|
||||||
|
fn to_string(&self) -> String {
|
||||||
|
self.url.to_string()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FromStr for RPCUrlParam {
|
||||||
|
type Err = Error;
|
||||||
|
|
||||||
|
fn from_str(url_str: &str) -> Result<Self, Self::Err> {
|
||||||
|
let mut url = Url::parse(url_str)
|
||||||
|
.map_err(|e| Error::UrlError(format!("could not parse {}: {}", url_str, e)))?;
|
||||||
|
|
||||||
|
if url.scheme() != "ws" {
|
||||||
|
return Err(Error::UrlError(format!("must have scheme ws, found {}", url.scheme())));
|
||||||
|
}
|
||||||
|
|
||||||
|
if url.port().is_none() {
|
||||||
|
url.set_port(Some(DEFAULT_WS_PORT))
|
||||||
|
.expect("the scheme is checked above to be ws; qed");
|
||||||
|
}
|
||||||
|
|
||||||
|
if url.path() != "/" {
|
||||||
|
return Err(Error::UrlError(format!("cannot have a path, found {}", url.path())));
|
||||||
|
}
|
||||||
|
if let Some(query) = url.query() {
|
||||||
|
return Err(Error::UrlError(format!("cannot have a query, found {}", query)));
|
||||||
|
}
|
||||||
|
if let Some(fragment) = url.fragment() {
|
||||||
|
return Err(Error::UrlError(format!("cannot have a fragment, found {}", fragment)));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(RPCUrlParam { url })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn rpc_url_from_str() {
|
||||||
|
assert_eq!(
|
||||||
|
RPCUrlParam::from_str("ws://127.0.0.1").unwrap().to_string(),
|
||||||
|
"ws://127.0.0.1:9944/"
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
RPCUrlParam::from_str("ws://127.0.0.1/").unwrap().to_string(),
|
||||||
|
"ws://127.0.0.1:9944/"
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
RPCUrlParam::from_str("ws://127.0.0.1:4499").unwrap().to_string(),
|
||||||
|
"ws://127.0.0.1:4499/"
|
||||||
|
);
|
||||||
|
assert!(RPCUrlParam::from_str("http://127.0.0.1").is_err());
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,29 @@
|
|||||||
|
use jsonrpsee::core::client::{RawClient, RawClientError, TransportClient};
|
||||||
|
use node_primitives::{BlockNumber, Hash, Header};
|
||||||
|
use sp_core::Bytes;
|
||||||
|
use sp_rpc::number::NumberOrHex;
|
||||||
|
|
||||||
|
jsonrpsee::rpc_api! {
|
||||||
|
pub SubstrateRPC {
|
||||||
|
#[rpc(method = "author_submitExtrinsic", positional_params)]
|
||||||
|
fn author_submit_extrinsic(extrinsic: Bytes) -> Hash;
|
||||||
|
|
||||||
|
#[rpc(method = "chain_getFinalizedHead")]
|
||||||
|
fn chain_finalized_head() -> Hash;
|
||||||
|
|
||||||
|
#[rpc(method = "chain_getBlockHash", positional_params)]
|
||||||
|
fn chain_block_hash(id: Option<NumberOrHex<BlockNumber>>) -> Option<Hash>;
|
||||||
|
|
||||||
|
#[rpc(method = "chain_getHeader", positional_params)]
|
||||||
|
fn chain_header(hash: Option<Hash>) -> Option<Header>;
|
||||||
|
|
||||||
|
#[rpc(positional_params)]
|
||||||
|
fn state_call(name: String, bytes: Bytes, hash: Option<Hash>) -> Bytes;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn genesis_block_hash<R: TransportClient>(client: &mut RawClient<R>)
|
||||||
|
-> Result<Option<Hash>, RawClientError<R::Error>>
|
||||||
|
{
|
||||||
|
SubstrateRPC::chain_block_hash(client, Some(NumberOrHex::Number(0))).await
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user