Use libp2p's implementation of a wasm websocket transport (#5089)

* Update to libp2p 0.16.2

* Use libp2ps implementation of a wasm websocket transport

* Remove explicit Configuration type in node-cli

Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com>
This commit is contained in:
Ashley
2020-03-10 11:14:20 +01:00
committed by GitHub
parent 1cfcf5cbfe
commit 3429967122
6 changed files with 13 additions and 165 deletions
+1 -1
View File
@@ -7656,7 +7656,7 @@ dependencies = [
"futures-timer 3.0.2",
"js-sys",
"kvdb-web",
"libp2p",
"libp2p-wasm-ext",
"log 0.4.8",
"rand 0.6.5",
"rand 0.7.3",
@@ -6,7 +6,6 @@
<link rel="shortcut icon" href="/favicon.png" />
<script type="module">
import { start_client, default as init } from './pkg/node_cli.js';
import ws from './ws.js';
function log(msg) {
document.getElementsByTagName('body')[0].innerHTML += msg + '\n';
@@ -20,7 +19,7 @@ async function start() {
const chain_spec_text = await chain_spec_response.text();
// Build our client.
let client = await start_client(chain_spec_text, 'info', ws());
let client = await start_client(chain_spec_text, 'info');
log('Client started');
client.rpcSubscribe('{"method":"chain_subscribeNewHead","params":[],"id":1,"jsonrpc":"2.0"}',
-148
View File
@@ -1,148 +0,0 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
export default () => {
return {
dial: dial,
listen_on: (addr) => {
let err = new Error("Listening on WebSockets is not possible from within a browser");
err.name = "NotSupportedError";
throw err;
},
};
}
/// Turns a string multiaddress into a WebSockets string URL.
// TODO: support dns addresses as well
const multiaddr_to_ws = (addr) => {
let parsed = addr.match(/^\/(ip4|ip6|dns4|dns6)\/(.*?)\/tcp\/(.*?)\/(ws|wss|x-parity-ws\/(.*)|x-parity-wss\/(.*))$/);
let proto = 'wss';
if (parsed[4] == 'ws' || parsed[4] == 'x-parity-ws') {
proto = 'ws';
}
let url = decodeURIComponent(parsed[5] || parsed[6] || '');
if (parsed != null) {
if (parsed[1] == 'ip6') {
return proto + "://[" + parsed[2] + "]:" + parsed[3] + url;
} else {
return proto + "://" + parsed[2] + ":" + parsed[3] + url;
}
}
let err = new Error("Address not supported: " + addr);
err.name = "NotSupportedError";
throw err;
}
// Attempt to dial a multiaddress.
const dial = (addr) => {
let ws = new WebSocket(multiaddr_to_ws(addr));
let reader = read_queue();
return new Promise((resolve, reject) => {
// TODO: handle ws.onerror properly after dialing has happened
ws.onerror = (ev) => reject(ev);
ws.onmessage = (ev) => reader.inject_blob(ev.data);
ws.onclose = () => reader.inject_eof();
ws.onopen = () => resolve({
read: (function*() { while(ws.readyState == 1) { yield reader.next(); } })(),
write: (data) => {
if (ws.readyState == 1) {
ws.send(data);
return promise_when_ws_finished(ws);
} else {
return Promise.reject("WebSocket is closed");
}
},
shutdown: () => {},
close: () => ws.close()
});
});
}
// Takes a WebSocket object and returns a Promise that resolves when bufferedAmount is 0.
const promise_when_ws_finished = (ws) => {
if (ws.bufferedAmount == 0) {
return Promise.resolve();
}
return new Promise((resolve, reject) => {
setTimeout(function check() {
if (ws.bufferedAmount == 0) {
resolve();
} else {
setTimeout(check, 100);
}
}, 2);
})
}
// Creates a queue reading system.
const read_queue = () => {
// State of the queue.
let state = {
// Array of promises resolving to `ArrayBuffer`s, that haven't been transmitted back with
// `next` yet.
queue: new Array(),
// If `resolve` isn't null, it is a "resolve" function of a promise that has already been
// returned by `next`. It should be called with some data.
resolve: null,
};
return {
// Inserts a new Blob in the queue.
inject_blob: (blob) => {
if (state.resolve != null) {
var resolve = state.resolve;
state.resolve = null;
var reader = new FileReader();
reader.addEventListener("loadend", () => resolve(reader.result));
reader.readAsArrayBuffer(blob);
} else {
state.queue.push(new Promise((resolve, reject) => {
var reader = new FileReader();
reader.addEventListener("loadend", () => resolve(reader.result));
reader.readAsArrayBuffer(blob);
}));
}
},
// Inserts an EOF message in the queue.
inject_eof: () => {
if (state.resolve != null) {
var resolve = state.resolve;
state.resolve = null;
resolve(null);
} else {
state.queue.push(Promise.resolve(null));
}
},
// Returns a Promise that yields the next entry as an ArrayBuffer.
next: () => {
if (state.queue.length != 0) {
return state.queue.shift(0);
} else {
if (state.resolve !== null)
throw "Internal error: already have a pending promise";
return new Promise((resolve, reject) => {
state.resolve = resolve;
});
}
}
};
};
+5 -6
View File
@@ -19,27 +19,26 @@ use log::info;
use wasm_bindgen::prelude::*;
use sc_service::Configuration;
use browser_utils::{
Transport, Client,
Client,
browser_configuration, set_console_error_panic_hook, init_console_log,
};
use std::str::FromStr;
/// Starts the client.
#[wasm_bindgen]
pub async fn start_client(chain_spec: String, log_level: String, wasm_ext: Transport) -> Result<Client, JsValue> {
start_inner(chain_spec, log_level, wasm_ext)
pub async fn start_client(chain_spec: String, log_level: String) -> Result<Client, JsValue> {
start_inner(chain_spec, log_level)
.await
.map_err(|err| JsValue::from_str(&err.to_string()))
}
async fn start_inner(chain_spec: String, log_level: String, wasm_ext: Transport) -> Result<Client, Box<dyn std::error::Error>> {
async fn start_inner(chain_spec: String, log_level: String) -> Result<Client, Box<dyn std::error::Error>> {
set_console_error_panic_hook();
init_console_log(log::Level::from_str(&log_level)?)?;
let chain_spec = ChainSpec::from_json_bytes(chain_spec.as_bytes().to_vec())
.map_err(|e| format!("{:?}", e))?;
let config: Configuration<_, _> = browser_configuration(wasm_ext, chain_spec)
.await?;
let config = browser_configuration(chain_spec).await?;
info!("Substrate browser node");
info!(" version {}", config.full_version());
+1 -1
View File
@@ -12,7 +12,7 @@ repository = "https://github.com/paritytech/substrate/"
futures = "0.3"
futures01 = { package = "futures", version = "0.1.29" }
log = "0.4.8"
libp2p = { version = "0.16.2", default-features = false }
libp2p-wasm-ext = { version = "0.16.2", features = ["websocket"] }
console_error_panic_hook = "0.1.6"
console_log = "0.1.2"
js-sys = "0.3.34"
+5 -7
View File
@@ -26,25 +26,23 @@ use futures::{prelude::*, channel::{oneshot, mpsc}, future::{poll_fn, ok}, compa
use std::task::Poll;
use std::pin::Pin;
use sc_chain_spec::Extension;
use libp2p_wasm_ext::{ExtTransport, ffi};
pub use libp2p::wasm_ext::{ExtTransport, ffi::Transport};
pub use console_error_panic_hook::set_once as set_console_error_panic_hook;
pub use console_log::init_with_level as init_console_log;
/// Create a service configuration from a chain spec and the websocket transport.
/// Create a service configuration from a chain spec.
///
/// This configuration contains good defaults for a browser light client.
pub async fn browser_configuration<G, E>(
transport: Transport,
chain_spec: ChainSpec<G, E>,
) -> Result<Configuration<G, E>, Box<dyn std::error::Error>>
pub async fn browser_configuration<G, E>(chain_spec: ChainSpec<G, E>)
-> Result<Configuration<G, E>, Box<dyn std::error::Error>>
where
G: RuntimeGenesis,
E: Extension,
{
let name = chain_spec.name().to_string();
let transport = ExtTransport::new(transport);
let transport = ExtTransport::new(ffi::websocket_transport());
let mut config = Configuration::default();
config.network.boot_nodes = chain_spec.boot_nodes().to_vec();
config.telemetry_endpoints = chain_spec.telemetry_endpoints().clone();