Companion PR for Substrate#4394 (#723)

* service/src/lib.rs: Register network event stream for authority disc

Previously one would create a sender and receiver channel pair, pass the
sender to the build_network_future through the service builder and
funnel network events returned from polling the network service into the
sender to be consumed by the authority discovery module owning the
receiver.

With recent changes it is now possible to register an event_stream
with the network service directly, thus one does not need to make the
detour through the build_network_future.

This commit is an adjusted clone of one targeting the Substrate
repository.

* service/src/lib.rs: Fix futures::stream imports

* [TMP] *: Replace polkadot-upstream with feature branch

* Switch branch

* Small change

* Companion PR to substrate#4542

* Revert "Merge remote-tracking branch 'tomaka/companion-4542' into ashley-browser-utils"

This reverts commit 17f00afe483ee65cb3cf4a0faca27034e6d6523a, reversing
changes made to 928cbb9c55542baff56b53accd9a5a45f12f01f1.

* ashley-browser-utils -> ashley-browser-utils-polkadot

* Switch branches back

Co-authored-by: Max Inden <mail@max-inden.de>
Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com>
This commit is contained in:
Ashley
2020-01-09 18:55:40 +01:00
committed by Gavin Wood
parent d9b8ba7707
commit 628dd08c20
4 changed files with 45 additions and 176 deletions
+10 -24
View File
@@ -10,41 +10,27 @@ crate-type = ["cdylib", "rlib"]
[dependencies] [dependencies]
log = "0.4.8" log = "0.4.8"
tokio = "0.1.22"
futures = { version = "0.3.1", features = ["compat"] } futures = { version = "0.3.1", features = ["compat"] }
futures01 = { package = "futures", version = "0.1.29" } futures01 = { package = "futures", version = "0.1.29" }
structopt = "=0.3.7" structopt = "=0.3.7"
cli = { package = "sc-cli", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } sc-cli = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
service = { package = "polkadot-service", path = "../service", default-features = false } service = { package = "polkadot-service", path = "../service", default-features = false }
libp2p = { version = "0.13.1", default-features = false, optional = true } tokio = { version = "0.1.22", optional = true }
wasm-bindgen = { version = "0.2.55", optional = true }
wasm-bindgen-futures = { version = "0.4.5", optional = true }
console_log = { version = "0.1.2", optional = true }
console_error_panic_hook = { version = "0.1.1", optional = true }
js-sys = { version = "0.3.22", optional = true }
kvdb-web = { version = "0.1.1", optional = true }
substrate-service = { package = "sc-service", git = "https://github.com/paritytech/substrate", branch = "polkadot-master", optional = true, default-features = false }
substrate-network = { package = "sc-network", git = "https://github.com/paritytech/substrate", branch = "polkadot-master", optional = true }
# Imported just for the `wasm-bindgen` feature wasm-bindgen = { version = "0.2.57", optional = true }
rand = { version = "0.7", features = ["wasm-bindgen"], optional = true } wasm-bindgen-futures = { version = "0.4.7", optional = true }
rand6 = { package = "rand", version = "0.6.5", features = ["wasm-bindgen"], optional = true } browser-utils = { package = "browser-utils", git = "https://github.com/paritytech/substrate", branch = "polkadot-master", optional = true }
substrate-service = { package = "sc-service", git = "https://github.com/paritytech/substrate", branch = "polkadot-master", optional = true, default-features = false }
[features] [features]
default = [ "wasmtime", "rocksdb" ] default = [ "wasmtime", "rocksdb", "cli" ]
wasmtime = [ "cli/wasmtime" ] wasmtime = [ "sc-cli/wasmtime" ]
rocksdb = [ "service/rocksdb" ] rocksdb = [ "service/rocksdb" ]
cli = [ "tokio" ]
browser = [ browser = [
"libp2p",
"wasm-bindgen", "wasm-bindgen",
"console_error_panic_hook",
"wasm-bindgen-futures", "wasm-bindgen-futures",
"console_log", "browser-utils",
"js-sys",
"kvdb-web",
"substrate-service", "substrate-service",
"substrate-network",
"rand",
"rand6",
] ]
+12 -133
View File
@@ -15,51 +15,28 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>. // along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use crate::ChainSpec; use crate::ChainSpec;
use futures01::{prelude::*, sync::oneshot, sync::mpsc}; use log::info;
use libp2p::wasm_ext; use substrate_service::Configuration;
use log::{debug, info};
use std::sync::Arc;
use service::{AbstractService, Roles as ServiceRoles};
use substrate_service::{RpcSession, Configuration, config::DatabaseConfig};
use wasm_bindgen::prelude::*; use wasm_bindgen::prelude::*;
use futures::{compat::*, TryFutureExt as _, TryStreamExt as _, FutureExt as _}; use service::CustomConfiguration;
/// Starts the client. /// Starts the client.
/// ///
/// You must pass a libp2p transport that supports . /// You must pass a libp2p transport that supports .
#[wasm_bindgen] #[wasm_bindgen]
pub async fn start_client(wasm_ext: wasm_ext::ffi::Transport) -> Result<Client, JsValue> { pub async fn start_client(wasm_ext: browser_utils::Transport) -> Result<browser_utils::Client, JsValue> {
start_inner(wasm_ext) start_inner(wasm_ext)
.await .await
.map_err(|err| JsValue::from_str(&err.to_string())) .map_err(|err| JsValue::from_str(&err.to_string()))
} }
async fn start_inner(wasm_ext: wasm_ext::ffi::Transport) -> Result<Client, Box<dyn std::error::Error>> { async fn start_inner(wasm_ext: browser_utils::Transport) -> Result<browser_utils::Client, Box<dyn std::error::Error>> {
console_error_panic_hook::set_once(); browser_utils::set_console_error_panic_hook();
console_log::init_with_level(log::Level::Info); browser_utils::init_console_log(log::Level::Info)?;
// Build the configuration to pass to the service. let chain_spec = ChainSpec::Kusama.load().map_err(|e| format!("{:?}", e))?;
let config = { let config: Configuration<CustomConfiguration, _, _> = browser_utils::browser_configuration(wasm_ext, chain_spec)
let wasm_ext = wasm_ext::ExtTransport::new(wasm_ext); .await?;
let chain_spec = ChainSpec::Kusama.load().map_err(|e| format!("{:?}", e))?;
let mut config = Configuration::<service::CustomConfiguration, _, _>::default_with_spec_and_base_path(chain_spec, None);
config.network.transport = substrate_network::config::TransportConfig::Normal {
wasm_external_transport: Some(wasm_ext.clone()),
allow_private_ipv4: true,
enable_mdns: false,
};
config.telemetry_external_transport = Some(wasm_ext);
config.roles = ServiceRoles::LIGHT;
config.name = "Browser node".to_string();
config.database = {
let db = kvdb_web::Database::open("polkadot".into(), 10)
.await
.unwrap();
DatabaseConfig::Custom(Arc::new(db))
};
config.keystore_path = Some(std::path::PathBuf::from("/"));
config
};
info!("Polkadot browser node"); info!("Polkadot browser node");
info!(" version {}", config.full_version()); info!(" version {}", config.full_version());
@@ -76,105 +53,7 @@ async fn start_inner(wasm_ext: wasm_ext::ffi::Transport) -> Result<Client, Box<d
info!("Roles: {:?}", config.roles); info!("Roles: {:?}", config.roles);
// Create the service. This is the most heavy initialization step. // Create the service. This is the most heavy initialization step.
let mut service = service::new_light(config).map_err(|e| format!("{:?}", e))?; let service = service::kusama_new_light(config).map_err(|e| format!("{:?}", e))?;
// We now dispatch a background task responsible for processing the service. Ok(browser_utils::start_client(service))
//
// The main action performed by the code below consists in polling the service with
// `service.poll()`.
// The rest consists in handling RPC requests.
let (rpc_send_tx, mut rpc_send_rx) = mpsc::unbounded::<RpcMessage>();
wasm_bindgen_futures::spawn_local(futures01::future::poll_fn(move || {
loop {
match rpc_send_rx.poll() {
Ok(Async::Ready(Some(message))) => {
let fut = service.rpc_query(&message.session, &message.rpc_json);
let _ = message.send_back.send(Box::new(fut));
},
Ok(Async::NotReady) => break,
Err(_) | Ok(Async::Ready(None)) => return Ok(Async::Ready(())),
}
}
loop {
match service.poll().map_err(|_| ())? {
Async::Ready(()) => return Ok(Async::Ready(())),
Async::NotReady => break
}
}
Ok::<_, ()>(Async::NotReady)
}).compat().map(drop));
Ok(Client {
rpc_send_tx,
})
}
/// A running client.
#[wasm_bindgen]
pub struct Client {
rpc_send_tx: mpsc::UnboundedSender<RpcMessage>,
}
struct RpcMessage {
rpc_json: String,
session: RpcSession,
send_back: oneshot::Sender<Box<dyn Future<Item = Option<String>, Error = ()> + Unpin>>,
}
#[wasm_bindgen]
impl Client {
/// Allows starting an RPC request. Returns a `Promise` containing the result of that request.
#[wasm_bindgen(js_name = "rpcSend")]
pub fn rpc_send(&mut self, rpc: &str) -> js_sys::Promise {
let rpc_session = RpcSession::new(mpsc::channel(1).0);
let (tx, rx) = oneshot::channel();
let _ = self.rpc_send_tx.unbounded_send(RpcMessage {
rpc_json: rpc.to_owned(),
session: rpc_session,
send_back: tx,
});
let fut = rx
.compat()
.map_err(|_| ())
.and_then(|fut| fut.compat())
.map_ok(|s| JsValue::from_str(&s.unwrap_or(String::new())))
.map_err(|_| JsValue::NULL);
wasm_bindgen_futures::future_to_promise(fut)
}
/// Subscribes to an RPC pubsub endpoint.
#[wasm_bindgen(js_name = "rpcSubscribe")]
pub fn rpc_subscribe(&mut self, rpc: &str, callback: js_sys::Function) {
let (tx, rx) = mpsc::channel(4);
let rpc_session = RpcSession::new(tx);
let (fut_tx, fut_rx) = oneshot::channel();
let _ = self.rpc_send_tx.unbounded_send(RpcMessage {
rpc_json: rpc.to_owned(),
session: rpc_session.clone(),
send_back: fut_tx,
});
let fut_rx = fut_rx
.compat()
.map_err(|_| ())
.and_then(|fut| fut.compat())
.map(drop);
wasm_bindgen_futures::spawn_local(fut_rx);
wasm_bindgen_futures::spawn_local(rx
.compat()
.try_for_each(move |s| {
match callback.call1(&callback, &JsValue::from_str(&s)) {
Ok(_) => futures::future::ready(Ok(())),
Err(_) => futures::future::ready(Err(())),
}
})
.then(move |_| {
// We need to keep `rpc_session` alive.
debug!("RPC subscription has ended");
drop(rpc_session);
futures::future::ready(())
})
);
}
} }
+22 -18
View File
@@ -27,6 +27,7 @@ use chain_spec::ChainSpec;
use futures::{ use futures::{
Future, FutureExt, TryFutureExt, future::select, channel::oneshot, compat::Future01CompatExt, Future, FutureExt, TryFutureExt, future::select, channel::oneshot, compat::Future01CompatExt,
}; };
#[cfg(feature = "cli")]
use tokio::runtime::Runtime; use tokio::runtime::Runtime;
use log::info; use log::info;
use structopt::StructOpt; use structopt::StructOpt;
@@ -36,8 +37,8 @@ pub use service::{
WrappedExecutor WrappedExecutor
}; };
pub use cli::{VersionInfo, IntoExit, NoCustom, SharedParams}; pub use sc_cli::{VersionInfo, IntoExit, NoCustom, SharedParams};
pub use cli::{display_role, error}; pub use sc_cli::{display_role, error};
/// Load the `ChainSpec` for the given `id`. /// Load the `ChainSpec` for the given `id`.
pub fn load_spec(id: &str) -> Result<Option<service::ChainSpec>, String> { pub fn load_spec(id: &str) -> Result<Option<service::ChainSpec>, String> {
@@ -53,8 +54,8 @@ enum PolkadotSubCommands {
ValidationWorker(ValidationWorkerCommand), ValidationWorker(ValidationWorkerCommand),
} }
impl cli::GetSharedParams for PolkadotSubCommands { impl sc_cli::GetSharedParams for PolkadotSubCommands {
fn shared_params(&self) -> Option<&cli::SharedParams> { None } fn shared_params(&self) -> Option<&sc_cli::SharedParams> { None }
} }
#[derive(Debug, StructOpt, Clone)] #[derive(Debug, StructOpt, Clone)]
@@ -70,8 +71,9 @@ struct PolkadotSubParams {
} }
/// Parses polkadot specific CLI arguments and run the service. /// Parses polkadot specific CLI arguments and run the service.
pub fn run<E: IntoExit>(exit: E, version: cli::VersionInfo) -> error::Result<()> { #[cfg(feature = "cli")]
let cmd = cli::parse_and_prepare::<PolkadotSubCommands, PolkadotSubParams, _>( pub fn run<E: IntoExit>(exit: E, version: sc_cli::VersionInfo) -> error::Result<()> {
let cmd = sc_cli::parse_and_prepare::<PolkadotSubCommands, PolkadotSubParams, _>(
&version, &version,
"parity-polkadot", "parity-polkadot",
std::env::args(), std::env::args(),
@@ -79,7 +81,7 @@ pub fn run<E: IntoExit>(exit: E, version: cli::VersionInfo) -> error::Result<()>
// Preload spec to select native runtime // Preload spec to select native runtime
let spec = match cmd.shared_params() { let spec = match cmd.shared_params() {
Some(params) => Some(cli::load_spec(params, &load_spec)?), Some(params) => Some(sc_cli::load_spec(params, &load_spec)?),
None => None, None => None,
}; };
if spec.as_ref().map_or(false, |c| c.is_kusama()) { if spec.as_ref().map_or(false, |c| c.is_kusama()) {
@@ -100,10 +102,11 @@ pub fn run<E: IntoExit>(exit: E, version: cli::VersionInfo) -> error::Result<()>
} }
/// Execute the given `cmd` with the given runtime. /// Execute the given `cmd` with the given runtime.
#[cfg(feature = "cli")]
fn execute_cmd_with_runtime<R, D, E, X>( fn execute_cmd_with_runtime<R, D, E, X>(
exit: X, exit: X,
version: &cli::VersionInfo, version: &sc_cli::VersionInfo,
cmd: cli::ParseAndPrepare<PolkadotSubCommands, PolkadotSubParams>, cmd: sc_cli::ParseAndPrepare<PolkadotSubCommands, PolkadotSubParams>,
spec: Option<service::ChainSpec>, spec: Option<service::ChainSpec>,
) -> error::Result<()> ) -> error::Result<()>
where where
@@ -120,7 +123,7 @@ where
// Use preloaded spec // Use preloaded spec
let load_spec = |_: &str| Ok(spec); let load_spec = |_: &str| Ok(spec);
match cmd { match cmd {
cli::ParseAndPrepare::Run(cmd) => cmd.run(load_spec, exit, sc_cli::ParseAndPrepare::Run(cmd) => cmd.run(load_spec, exit,
|exit, _cli_args, custom_args, mut config| { |exit, _cli_args, custom_args, mut config| {
info!("{}", version.name); info!("{}", version.name);
info!(" version {}", config.full_version()); info!(" version {}", config.full_version());
@@ -154,17 +157,17 @@ where
), ),
}.map_err(|e| format!("{:?}", e)) }.map_err(|e| format!("{:?}", e))
}), }),
cli::ParseAndPrepare::BuildSpec(cmd) => cmd.run::<NoCustom, _, _, _>(load_spec), sc_cli::ParseAndPrepare::BuildSpec(cmd) => cmd.run::<NoCustom, _, _, _>(load_spec),
cli::ParseAndPrepare::ExportBlocks(cmd) => cmd.run_with_builder::<_, _, _, _, _, _, _>(|config| sc_cli::ParseAndPrepare::ExportBlocks(cmd) => cmd.run_with_builder::<_, _, _, _, _, _, _>(|config|
Ok(service::new_chain_ops::<R, D, E>(config)?), load_spec, exit), Ok(service::new_chain_ops::<R, D, E>(config)?), load_spec, exit),
cli::ParseAndPrepare::ImportBlocks(cmd) => cmd.run_with_builder::<_, _, _, _, _, _, _>(|config| sc_cli::ParseAndPrepare::ImportBlocks(cmd) => cmd.run_with_builder::<_, _, _, _, _, _, _>(|config|
Ok(service::new_chain_ops::<R, D, E>(config)?), load_spec, exit), Ok(service::new_chain_ops::<R, D, E>(config)?), load_spec, exit),
cli::ParseAndPrepare::CheckBlock(cmd) => cmd.run_with_builder::<_, _, _, _, _, _, _>(|config| sc_cli::ParseAndPrepare::CheckBlock(cmd) => cmd.run_with_builder::<_, _, _, _, _, _, _>(|config|
Ok(service::new_chain_ops::<R, D, E>(config)?), load_spec, exit), Ok(service::new_chain_ops::<R, D, E>(config)?), load_spec, exit),
cli::ParseAndPrepare::PurgeChain(cmd) => cmd.run(load_spec), sc_cli::ParseAndPrepare::PurgeChain(cmd) => cmd.run(load_spec),
cli::ParseAndPrepare::RevertChain(cmd) => cmd.run_with_builder::<_, _, _, _, _, _>(|config| sc_cli::ParseAndPrepare::RevertChain(cmd) => cmd.run_with_builder::<_, _, _, _, _, _>(|config|
Ok(service::new_chain_ops::<R, D, E>(config)?), load_spec), Ok(service::new_chain_ops::<R, D, E>(config)?), load_spec),
cli::ParseAndPrepare::CustomCommand(PolkadotSubCommands::ValidationWorker(args)) => { sc_cli::ParseAndPrepare::CustomCommand(PolkadotSubCommands::ValidationWorker(args)) => {
if cfg!(feature = "browser") { if cfg!(feature = "browser") {
Err(error::Error::Input("Cannot run validation worker in browser".into())) Err(error::Error::Input("Cannot run validation worker in browser".into()))
} else { } else {
@@ -177,6 +180,7 @@ where
} }
/// Run the given `service` using the `runtime` until it exits or `e` fires. /// Run the given `service` using the `runtime` until it exits or `e` fires.
#[cfg(feature = "cli")]
pub fn run_until_exit( pub fn run_until_exit(
mut runtime: Runtime, mut runtime: Runtime,
service: impl AbstractService, service: impl AbstractService,
@@ -185,7 +189,7 @@ pub fn run_until_exit(
let (exit_send, exit) = oneshot::channel(); let (exit_send, exit) = oneshot::channel();
let executor = runtime.executor(); let executor = runtime.executor();
let informant = cli::informant::build(&service); let informant = sc_cli::informant::build(&service);
let future = select(exit, informant) let future = select(exit, informant)
.map(|_| Ok(())) .map(|_| Ok(()))
.compat(); .compat();
+1 -1
View File
@@ -35,7 +35,7 @@ consensus_common = { package = "sp-consensus", git = "https://github.com/parityt
grandpa = { package = "sc-finality-grandpa", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } grandpa = { package = "sc-finality-grandpa", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
grandpa_primitives = { package = "sp-finality-grandpa", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } grandpa_primitives = { package = "sp-finality-grandpa", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
inherents = { package = "sp-inherents", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } inherents = { package = "sp-inherents", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
service = { package = "sc-service", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } service = { package = "sc-service", git = "https://github.com/paritytech/substrate", branch = "polkadot-master", default-features = false }
telemetry = { package = "sc-telemetry", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } telemetry = { package = "sc-telemetry", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
sc-transaction-pool = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } sc-transaction-pool = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
sp-transaction-pool = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } sp-transaction-pool = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }