From a3eec9362f39282b18aa0abe83d76c524466f1ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Tue, 17 Apr 2018 13:03:57 +0200 Subject: [PATCH] Chain head subscription (#126) * Start WebSockets server. * Expose non-working subscription. * Dummy subscription for testing. * Proper implementation with event loop. * Finalized pubsub. * Bump clap. * Fix yml. * Disable WS logs. * Remove stale TransactionHash mention * Fix build from nightly API change. * Don't panic on invalid port. * Bind server to random port. * Send only best blocks. --- polkadot/api/src/lib.rs | 3 +- polkadot/cli/src/cli.yml | 7 +++- polkadot/cli/src/lib.rs | 79 +++++++++++++++++++++++++++++----------- 3 files changed, 64 insertions(+), 25 deletions(-) diff --git a/polkadot/api/src/lib.rs b/polkadot/api/src/lib.rs index e9f422503d..e178f1d547 100644 --- a/polkadot/api/src/lib.rs +++ b/polkadot/api/src/lib.rs @@ -355,7 +355,6 @@ impl BlockBuilder for ClientBlockBuilder #[cfg(test)] mod tests { use super::*; - use runtime_io::with_externalities; use keyring::Keyring; use codec::Slicable; use client::in_mem::Backend as InMemory; @@ -388,7 +387,7 @@ mod tests { ::client::new_in_mem( LocalDispatch::new(), || { - let mut storage = genesis_config.build_externalities(); + let storage = genesis_config.build_externalities(); let block = ::client::genesis::construct_genesis_block(&storage); (substrate_primitives::block::Header::decode(&mut block.header.encode().as_ref()).expect("to_vec() always gives a valid serialisation; qed"), storage.into_iter().collect()) } diff --git a/polkadot/cli/src/cli.yml b/polkadot/cli/src/cli.yml index 86ca40314a..07d3c0ec66 100644 --- a/polkadot/cli/src/cli.yml +++ b/polkadot/cli/src/cli.yml @@ -40,7 +40,12 @@ args: - rpc-port: long: rpc-port value_name: PORT - help: Specify RPC server TCP port + help: Specify HTTP RPC server TCP port + takes_value: true + - ws-port: + long: ws-port + value_name: PORT + help: Specify WebSockets RPC server TCP port takes_value: true - bootnodes: long: bootnodes diff --git a/polkadot/cli/src/lib.rs b/polkadot/cli/src/lib.rs index 95a432b252..7f82a9bc3c 100644 --- a/polkadot/cli/src/lib.rs +++ b/polkadot/cli/src/lib.rs @@ -47,8 +47,9 @@ extern crate log; pub mod error; mod informant; -use std::path::{Path, PathBuf}; +use std::io; use std::net::SocketAddr; +use std::path::{Path, PathBuf}; use futures::sync::mpsc; use futures::{Sink, Future, Stream}; use tokio_core::reactor; @@ -117,43 +118,76 @@ pub fn run(args: I) -> error::Result<()> where }); config.roles = role; - config.network.boot_nodes = matches - .values_of("bootnodes") - .map_or(Default::default(), |v| v.map(|n| n.to_owned()).collect()); - config.network.config_path = Some(network_path(&base_path).to_string_lossy().into()); - config.network.net_config_path = config.network.config_path.clone(); + { + config.network.boot_nodes = matches + .values_of("bootnodes") + .map_or(Default::default(), |v| v.map(|n| n.to_owned()).collect()); + config.network.config_path = Some(network_path(&base_path).to_string_lossy().into()); + config.network.net_config_path = config.network.config_path.clone(); - let port = match matches.value_of("port") { - Some(port) => port.parse().expect("Invalid p2p port value specified."), - None => 30333, - }; - config.network.listen_address = Some(SocketAddr::new("0.0.0.0".parse().unwrap(), port)); - config.network.public_address = None; - config.network.client_version = format!("parity-polkadot/{}", crate_version!()); + let port = match matches.value_of("port") { + Some(port) => port.parse().expect("Invalid p2p port value specified."), + None => 30333, + }; + config.network.listen_address = Some(SocketAddr::new("0.0.0.0".parse().unwrap(), port)); + config.network.public_address = None; + config.network.client_version = format!("parity-polkadot/{}", crate_version!()); + } config.keys = matches.values_of("key").unwrap_or_default().map(str::to_owned).collect(); let service = service::Service::new(config)?; - let mut address: SocketAddr = "127.0.0.1:9933".parse().unwrap(); - if let Some(port) = matches.value_of("rpc-port") { - let rpc_port: u16 = port.parse().expect("Invalid RPC port value specified."); - address.set_port(rpc_port); - } - - let handler = rpc::rpc_handler(service.client(), service.transaction_pool(), service.client()); - let _server = rpc::start_http(&address, handler)?; - informant::start(&service, core.handle()); let (exit_send, exit) = mpsc::channel(1); ctrlc::CtrlC::set_handler(move || { exit_send.clone().send(()).wait().expect("Error sending exit notification"); }); + + let _rpc_servers = { + let http_address = parse_address("127.0.0.1:9933", "rpc-port", &matches)?; + let ws_address = parse_address("127.0.0.1:9944", "ws-port", &matches)?; + + let handler = || { + let chain = rpc::apis::chain::Chain::new(service.client(), core.remote()); + rpc::rpc_handler(service.client(), chain, service.transaction_pool()) + }; + ( + start_server(http_address, |address| rpc::start_http(address, handler())), + start_server(ws_address, |address| rpc::start_ws(address, handler())), + ) + }; + core.run(exit.into_future()).expect("Error running informant event loop"); Ok(()) } +fn start_server(mut address: SocketAddr, start: F) -> Result where + F: Fn(&SocketAddr) -> Result, +{ + start(&address) + .or_else(|e| match e.kind() { + io::ErrorKind::AddrInUse | + io::ErrorKind::PermissionDenied => { + warn!("Unable to bind server to {}. Trying random port.", address); + address.set_port(0); + start(&address) + }, + _ => Err(e), + }) +} + +fn parse_address(default: &str, port_param: &str, matches: &clap::ArgMatches) -> Result { + let mut address: SocketAddr = default.parse().unwrap(); + if let Some(port) = matches.value_of(port_param) { + let port: u16 = port.parse().ok().ok_or(format!("Invalid port for --{} specified.", port_param))?; + address.set_port(port); + } + + Ok(address) +} + fn keystore_path(base_path: &Path) -> PathBuf { let mut path = base_path.to_owned(); path.push("keystore"); @@ -183,6 +217,7 @@ fn default_base_path() -> PathBuf { fn init_logger(pattern: &str) { let mut builder = env_logger::LogBuilder::new(); // Disable info logging by default for some modules: + builder.filter(Some("ws"), log::LogLevelFilter::Warn); builder.filter(Some("hyper"), log::LogLevelFilter::Warn); // Enable info for others. builder.filter(None, log::LogLevelFilter::Info);