Remove substrate relay. (#244)

This commit is contained in:
Tomasz Drwięga
2020-07-29 11:00:16 +02:00
committed by Bastian Köcher
parent 8456d0d969
commit d3f6948050
7 changed files with 0 additions and 757 deletions
-33
View File
@@ -1,33 +0,0 @@
[package]
name = "substrate-bridge"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
[dependencies]
async-std = "=1.5.0"
clap = "2.33.1"
ctrlc = "3.1.5"
derive_more = "0.99.9"
env_logger = "0.7.1"
futures = "0.3.5"
jsonrpsee = { git = "https://github.com/paritytech/jsonrpsee", features = ["ws"] }
log = "0.4.11"
serde_json = "1.0.57"
url = "2.1.0"
[dependencies.sp-core]
version = "2.0.0-rc4"
tag = 'v2.0.0-rc4'
git = "https://github.com/paritytech/substrate/"
[dependencies.sp-rpc]
version = "2.0.0-rc4"
tag = 'v2.0.0-rc4'
git = "https://github.com/paritytech/substrate/"
[dependencies.node-primitives]
version = "2.0.0-rc4"
tag = 'v2.0.0-rc4'
git = "https://github.com/paritytech/substrate/"
-32
View File
@@ -1,32 +0,0 @@
# Substrate-to-Substrate Bridge Relay
The bridge relay is a process that connects to running Substrate nodes and sends data over the Substrate-to-Substrate bridge. The process communicates with the nodes over the JSON-RPC interface and reads data from the relays information required by the `bridge` pallet using runtime calls and writes data to the modules by constructing and submitting extrinsics.
For more details, see the [design document](doc/design.md).
## Status
This is a not-in-progress prototype.
## Running in development
Run two development Substrate chains:
```bash
> TMPDIR=(mktemp -d)
> cd $TMPDIR
> substrate build-spec --dev > red-spec.json
> cp red-spec.json blue-spec.json
# Modify the chain spec in an editor so that the genesis hashes of the two chains differ.
# For example, double one of the balances in '$.genesis.runtime.balances.balances'.
> substrate --chain red-spec.json --alice --base-path ./red --port 30343 --ws-port 9954
> substrate --chain blue-spec.json --alice --base-path ./blue --port 30353 --ws-port 9964
```
Now run the bridge relay:
```
> target/release/substrate-bridge --base-path ./relay \
--rpc-url ws://localhost:9954 \
--rpc-url ws://localhost:9964
```
-413
View File
@@ -1,413 +0,0 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use crate::error::Error;
use crate::params::{Params, RPCUrlParam};
use crate::rpc::{self, SubstrateRPC};
use futures::{
channel::{mpsc, oneshot},
future,
prelude::*,
select,
};
use jsonrpsee::{
raw::client::{RawClient, RawClientError, RawClientEvent, RawClientRequestId, RawClientSubscription},
transport::{
ws::{WsConnecError, WsTransportClient},
TransportClient,
},
};
use node_primitives::{Hash, Header};
use sp_core::Bytes;
use std::cell::RefCell;
use std::collections::HashMap;
use std::pin::Pin;
type ChainId = Hash;
struct BridgeState {
channel: mpsc::Sender<Event>,
locally_finalized_head_on_bridged_chain: Header,
}
struct ChainState {
current_finalized_head: Header,
bridges: HashMap<ChainId, BridgeState>,
}
enum Event {
SubmitExtrinsic(Bytes),
}
struct Chain {
url: String,
client: RawClient<WsTransportClient>,
sender: mpsc::Sender<Event>,
receiver: mpsc::Receiver<Event>,
genesis_hash: Hash,
state: ChainState,
}
async fn init_rpc_connection(url: &RPCUrlParam) -> Result<Chain, Error> {
let url_str = url.to_string();
log::debug!("Connecting to {}", url_str);
// Skip the leading "ws://" and trailing "/".
let url_without_scheme = &url_str[5..(url_str.len() - 1)];
let transport = WsTransportClient::new(url_without_scheme)
.await
.map_err(|err| Error::WsConnectionError(err.to_string()))?;
let mut client = RawClient::new(transport);
let genesis_hash = rpc::genesis_block_hash(&mut client)
.await
.map_err(|e| Error::RPCError(e.to_string()))?
.ok_or_else(|| {
Error::InvalidChainState(format!(
"chain with RPC URL {} is missing a genesis block hash",
url_str,
))
})?;
let latest_finalized_hash = SubstrateRPC::chain_finalized_head(&mut client)
.await
.map_err(|e| Error::RPCError(e.to_string()))?;
let latest_finalized_header = SubstrateRPC::chain_header(&mut client, Some(latest_finalized_hash))
.await
.map_err(|e| Error::RPCError(e.to_string()))?
.ok_or_else(|| {
Error::InvalidChainState(format!(
"chain {} is missing header for finalized block hash {}",
genesis_hash, latest_finalized_hash
))
})?;
let (sender, receiver) = mpsc::channel(0);
Ok(Chain {
url: url_str,
client,
sender,
receiver,
genesis_hash,
state: ChainState {
current_finalized_head: latest_finalized_header,
bridges: HashMap::new(),
},
})
}
/// Returns IDs of the bridged chains.
async fn read_bridges(chain: &mut Chain, chain_ids: &[Hash]) -> Result<Vec<Hash>, Error> {
// This should make an RPC call to read this information from the bridge pallet state.
// For now, just pretend every chain is bridged to every other chain.
//
// TODO: The correct thing.
Ok(chain_ids
.iter()
.cloned()
.filter(|&chain_id| chain_id != chain.genesis_hash)
.collect())
}
pub async fn run_async(params: Params, exit: Box<dyn Future<Output = ()> + Unpin + Send>) -> Result<(), Error> {
let chains = init_chains(&params).await?;
let (chain_tasks, exit_signals) = chains
.into_iter()
.map(|(chain_id, chain_cell)| {
let chain = chain_cell.into_inner();
let (task_exit_signal, task_exit_receiver) = oneshot::channel();
let task_exit = Box::new(
task_exit_receiver
.map(|result| result.expect("task_exit_signal is not dropped before send() is called")),
);
let chain_task = async_std::task::spawn(async move {
if let Err(err) = chain_task(chain_id, chain, task_exit).await {
log::error!("Error in task for chain {}: {}", chain_id, err);
}
});
(chain_task, task_exit_signal)
})
.unzip::<_, _, Vec<_>, Vec<_>>();
async_std::task::spawn(async move {
exit.await;
for exit_signal in exit_signals {
let _ = exit_signal.send(());
}
});
future::join_all(chain_tasks).await;
Ok(())
}
type EventsResult = Result<(ChainId, RawClientEvent), Error>;
type EventsFuture<'a> = Pin<Box<dyn Future<Output = EventsResult> + 'a>>;
fn initial_next_events<'a>(chains: &'a HashMap<ChainId, RefCell<Chain>>) -> Vec<EventsFuture<'a>> {
chains
.values()
.map(|chain_cell| async move {
let mut chain = chain_cell.borrow_mut();
let event = chain
.client
.next_event()
.await
.map_err(|err| Error::RPCError(err.to_string()))?;
Ok((chain.genesis_hash, event))
})
.map(|fut| Box::pin(fut) as Pin<Box<dyn Future<Output = _>>>)
.collect()
}
async fn next_event<'a>(
next_events: Vec<EventsFuture<'a>>,
chains: &'a HashMap<ChainId, RefCell<Chain>>,
) -> (Result<(Hash, RawClientEvent), Error>, Vec<EventsFuture<'a>>) {
let (result, _, mut rest) = future::select_all(next_events).await;
match result {
Ok((chain_id, _)) => {
let fut = async move {
let chain_cell = chains
.get(&chain_id)
.expect("chain must be in the map as a function precondition; qed");
let mut chain = chain_cell.borrow_mut();
let event = chain
.client
.next_event()
.await
.map_err(|err| Error::RPCError(err.to_string()))?;
Ok((chain_id, event))
};
rest.push(Box::pin(fut));
}
Err(ref err) => log::warn!("error in RPC connection with a chain: {}", err),
}
(result, rest)
}
async fn init_chains(params: &Params) -> Result<HashMap<ChainId, RefCell<Chain>>, Error> {
let chains = future::join_all(params.rpc_urls.iter().map(init_rpc_connection))
.await
.into_iter()
.map(|result| result.map(|chain| (chain.genesis_hash, RefCell::new(chain))))
.collect::<Result<HashMap<_, _>, _>>()?;
// TODO: Remove when read_bridges is implemented correctly.
let chain_ids = chains.keys().cloned().collect::<Vec<_>>();
// let chain_ids_slice = chain_ids.as_slice();
for (&chain_id, chain_cell) in chains.iter() {
let mut chain = chain_cell.borrow_mut();
for bridged_chain_id in read_bridges(&mut chain, &chain_ids).await? {
if chain_id == bridged_chain_id {
log::warn!("chain {} has a bridge to itself", chain_id);
continue;
}
if let Some(bridged_chain_cell) = chains.get(&bridged_chain_id) {
let bridged_chain = bridged_chain_cell.borrow_mut();
// TODO: Get this from RPC to runtime API.
let genesis_head = SubstrateRPC::chain_header(&mut chain.client, chain_id)
.await
.map_err(|e| Error::RPCError(e.to_string()))?
.ok_or_else(|| {
Error::InvalidChainState(format!("chain {} is missing a genesis block header", chain_id))
})?;
let channel = chain.sender.clone();
chain.state.bridges.insert(
bridged_chain_id,
BridgeState {
channel,
locally_finalized_head_on_bridged_chain: genesis_head,
},
);
// The conditional ensures that we don't log twice per pair of chains.
if chain_id.as_ref() < bridged_chain_id.as_ref() {
log::info!("initialized bridge between {} and {}", chain_id, bridged_chain_id);
}
}
}
}
Ok(chains)
}
async fn setup_subscriptions(
chain: &mut Chain,
) -> Result<(RawClientRequestId, RawClientRequestId), RawClientError<WsConnecError>> {
let new_heads_subscription_id = chain
.client
.start_subscription("chain_subscribeNewHeads", jsonrpsee::common::Params::None)
.await
.map_err(RawClientError::Inner)?;
let finalized_heads_subscription_id = chain
.client
.start_subscription("chain_subscribeFinalizedHeads", jsonrpsee::common::Params::None)
.await
.map_err(RawClientError::Inner)?;
let new_heads_subscription = chain
.client
.subscription_by_id(new_heads_subscription_id)
.expect("subscription_id was returned from start_subscription above; qed");
let new_heads_subscription = match new_heads_subscription {
RawClientSubscription::Active(_) => {}
RawClientSubscription::Pending(subscription) => {
subscription.wait().await?;
}
};
let finalized_heads_subscription = chain
.client
.subscription_by_id(finalized_heads_subscription_id)
.expect("subscription_id was returned from start_subscription above; qed");
let finalized_heads_subscription = match finalized_heads_subscription {
RawClientSubscription::Active(subscription) => {}
RawClientSubscription::Pending(subscription) => {
subscription.wait().await?;
}
};
Ok((new_heads_subscription_id, finalized_heads_subscription_id))
}
async fn handle_rpc_event(
chain_id: ChainId,
chain: &mut Chain,
event: RawClientEvent,
new_heads_subscription_id: RawClientRequestId,
finalized_heads_subscription_id: RawClientRequestId,
) -> Result<(), Error> {
match event {
RawClientEvent::SubscriptionNotif { request_id, result } => {
if request_id == new_heads_subscription_id {
let header: Header = serde_json::from_value(result).map_err(Error::SerializationError)?;
log::info!("Received new head {:?} on chain {}", header, chain_id);
} else if request_id == finalized_heads_subscription_id {
let header: Header = serde_json::from_value(result).map_err(Error::SerializationError)?;
log::info!("Received finalized head {:?} on chain {}", header, chain_id);
// let old_finalized_head = chain_state.current_finalized_head;
chain.state.current_finalized_head = header;
for (bridged_chain_id, bridged_chain) in chain.state.bridges.iter_mut() {
if bridged_chain.locally_finalized_head_on_bridged_chain.number
< chain.state.current_finalized_head.number
{
// Craft and submit an extrinsic over RPC
log::info!("Sending command to submit extrinsic to chain {}", chain_id);
let mut send_event = bridged_chain
.channel
.send(Event::SubmitExtrinsic(Bytes(Vec::new())))
.fuse();
// Continue processing events from other chain tasks while waiting to send
// event to other chain task in order to prevent deadlocks.
loop {
select! {
result = send_event => {
result.map_err(Error::ChannelError)?;
break;
}
event = chain.receiver.next().fuse() => {
let event = event
.expect("stream will never close as the chain has an mpsc Sender");
handle_bridge_event(chain_id, &mut chain.client, event)
.await?;
}
// TODO: exit
}
}
}
}
} else {
return Err(Error::RPCError(format!(
"unexpected subscription response with request ID {:?}",
request_id
)));
}
}
_ => {
return Err(Error::RPCError(format!(
"unexpected RPC event from chain {}: {:?}",
chain_id, event
)))
}
}
Ok(())
}
// Let's say this never sends over a channel (ie. cannot block on another task).
async fn handle_bridge_event<R: TransportClient>(
chain_id: ChainId,
rpc_client: &mut RawClient<R>,
event: Event,
) -> Result<(), Error> {
match event {
Event::SubmitExtrinsic(data) => {
log::info!("Submitting extrinsic to chain {}", chain_id);
if let Err(err) = SubstrateRPC::author_submit_extrinsic(rpc_client, data).await {
log::error!("failed to submit extrinsic: {}", err);
}
}
}
Ok(())
}
async fn chain_task(
chain_id: ChainId,
mut chain: Chain,
exit: impl Future<Output = ()> + Unpin + Send,
) -> Result<(), Error> {
let (new_heads_subscription_id, finalized_heads_subscription_id) = setup_subscriptions(&mut chain)
.await
.map_err(|e| Error::RPCError(e.to_string()))?;
let mut exit = exit.fuse();
loop {
select! {
result = chain.client.next_event().fuse() => {
let event = result.map_err(|e| Error::RPCError(e.to_string()))?;
handle_rpc_event(
chain_id,
&mut chain,
event,
new_heads_subscription_id,
finalized_heads_subscription_id,
).await?;
}
event = chain.receiver.next().fuse() => {
let event = event
.expect("stream will never close as the chain has an mpsc Sender");
handle_bridge_event(chain_id, &mut chain.client, event)
.await?;
}
_ = exit => {
log::debug!("Received exit signal, shutting down task for chain {}", chain_id);
break;
}
}
}
Ok(())
}
-47
View File
@@ -1,47 +0,0 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use futures::channel::mpsc;
#[derive(Debug, derive_more::Display)]
pub enum Error {
#[display(fmt = "invalid RPC URL: {}", _0)]
UrlError(String),
#[display(fmt = "RPC response indicates invalid chain state: {}", _0)]
InvalidChainState(String),
#[display(fmt = "could not make RPC call: {}", _0)]
RPCError(String),
#[display(fmt = "could not connect to RPC URL: {}", _0)]
WsConnectionError(String),
#[display(fmt = "unexpected client event from RPC URL {}: {:?}", _0, _1)]
UnexpectedClientEvent(String, String),
#[display(fmt = "serialization error: {}", _0)]
SerializationError(serde_json::error::Error),
#[display(fmt = "invalid event received from bridged chain: {}", _0)]
InvalidBridgeEvent(String),
#[display(fmt = "error sending over MPSC channel: {}", _0)]
ChannelError(mpsc::SendError),
}
impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Error::SerializationError(err) => Some(err),
Error::ChannelError(err) => Some(err),
_ => None,
}
}
}
-93
View File
@@ -1,93 +0,0 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
// TODO: when (if) this relay will be resurrected, remove this
#![allow(unused_variables)]
#![allow(dead_code)]
mod bridge;
mod error;
mod params;
mod rpc;
use bridge::run_async;
use params::{Params, RPCUrlParam};
use clap::{value_t, values_t, App, Arg};
use futures::{channel, prelude::*};
use std::cell::Cell;
use std::process;
fn main() {
let params = parse_args();
env_logger::init();
let exit = setup_exit_handler();
let result = async_std::task::block_on(async move { run_async(params, exit).await });
if let Err(err) = result {
log::error!("{}", err);
process::exit(1);
}
}
fn parse_args() -> Params {
let matches = App::new("substrate-bridge")
.version("1.0")
.author("Parity Technologies")
.about("Bridges Substrates, duh")
.arg(
Arg::with_name("base-path")
.long("base-path")
.value_name("DIRECTORY")
.required(true)
.help("Sets the base path")
.takes_value(true),
)
.arg(
Arg::with_name("rpc-url")
.long("rpc-url")
.value_name("HOST[:PORT]")
.help("The URL of a bridged Substrate node")
.takes_value(true)
.multiple(true),
)
.get_matches();
let base_path = value_t!(matches, "base-path", String).unwrap_or_else(|e| e.exit());
let rpc_urls = values_t!(matches, "rpc-url", RPCUrlParam).unwrap_or_else(|e| e.exit());
Params { base_path, rpc_urls }
}
fn setup_exit_handler() -> Box<dyn Future<Output = ()> + Unpin + Send> {
let (exit_sender, exit_receiver) = channel::oneshot::channel();
let exit_sender = Cell::new(Some(exit_sender));
ctrlc::set_handler(move || {
if let Some(exit_sender) = exit_sender.take() {
if let Err(()) = exit_sender.send(()) {
log::warn!("failed to send exit signal");
}
}
})
.expect("must be able to set Ctrl-C handler");
Box::new(
exit_receiver.map(|result| {
result.expect("exit_sender cannot be dropped as it is moved into a globally-referenced closure")
}),
)
}
-91
View File
@@ -1,91 +0,0 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use crate::error::Error;
use std::str::FromStr;
use url::Url;
const DEFAULT_WS_PORT: u16 = 9944;
#[derive(Debug, Clone)]
pub struct Params {
pub base_path: String,
pub rpc_urls: Vec<RPCUrlParam>,
}
#[derive(Debug, Clone)]
pub struct RPCUrlParam {
url: Url,
}
impl ToString for RPCUrlParam {
fn to_string(&self) -> String {
self.url.to_string()
}
}
impl FromStr for RPCUrlParam {
type Err = Error;
fn from_str(url_str: &str) -> Result<Self, Self::Err> {
let mut url =
Url::parse(url_str).map_err(|e| Error::UrlError(format!("could not parse {}: {}", url_str, e)))?;
if url.scheme() != "ws" {
return Err(Error::UrlError(format!("must have scheme ws, found {}", url.scheme())));
}
if url.port().is_none() {
url.set_port(Some(DEFAULT_WS_PORT))
.expect("the scheme is checked above to be ws; qed");
}
if url.path() != "/" {
return Err(Error::UrlError(format!("cannot have a path, found {}", url.path())));
}
if let Some(query) = url.query() {
return Err(Error::UrlError(format!("cannot have a query, found {}", query)));
}
if let Some(fragment) = url.fragment() {
return Err(Error::UrlError(format!("cannot have a fragment, found {}", fragment)));
}
Ok(RPCUrlParam { url })
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn rpc_url_from_str() {
assert_eq!(
RPCUrlParam::from_str("ws://127.0.0.1").unwrap().to_string(),
"ws://127.0.0.1:9944/"
);
assert_eq!(
RPCUrlParam::from_str("ws://127.0.0.1/").unwrap().to_string(),
"ws://127.0.0.1:9944/"
);
assert_eq!(
RPCUrlParam::from_str("ws://127.0.0.1:4499").unwrap().to_string(),
"ws://127.0.0.1:4499/"
);
assert!(RPCUrlParam::from_str("http://127.0.0.1").is_err());
}
}
-48
View File
@@ -1,48 +0,0 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use jsonrpsee::{
raw::client::{RawClient, RawClientError},
transport::TransportClient,
};
use node_primitives::{Hash, Header};
use sp_core::Bytes;
use sp_rpc::number::NumberOrHex;
jsonrpsee::rpc_api! {
pub SubstrateRPC {
#[rpc(method = "author_submitExtrinsic", positional_params)]
fn author_submit_extrinsic(extrinsic: Bytes) -> Hash;
#[rpc(method = "chain_getFinalizedHead")]
fn chain_finalized_head() -> Hash;
#[rpc(method = "chain_getBlockHash", positional_params)]
fn chain_block_hash(id: Option<NumberOrHex>) -> Option<Hash>;
#[rpc(method = "chain_getHeader", positional_params)]
fn chain_header(hash: Option<Hash>) -> Option<Header>;
#[rpc(positional_params)]
fn state_call(name: String, bytes: Bytes, hash: Option<Hash>) -> Bytes;
}
}
pub async fn genesis_block_hash<R: TransportClient>(
client: &mut RawClient<R>,
) -> Result<Option<Hash>, RawClientError<R::Error>> {
SubstrateRPC::chain_block_hash(client, Some(NumberOrHex::Number(0))).await
}