Chain head subscription (#126)

* Start WebSockets server.

* Expose non-working subscription.

* Dummy subscription for testing.

* Proper implementation with event loop.

* Finalized pubsub.

* Bump clap.

* Fix yml.

* Disable WS logs.

* Remove stale TransactionHash mention

* Fix build from nightly API change.

* Don't panic on invalid port.

* Bind server to random port.

* Send only best blocks.
This commit is contained in:
Tomasz Drwięga
2018-04-17 13:03:57 +02:00
committed by Gav Wood
parent eb6d142846
commit e253a4cb9f
18 changed files with 464 additions and 107 deletions
+6 -1
View File
@@ -40,7 +40,12 @@ args:
- rpc-port:
long: rpc-port
value_name: PORT
help: Specify RPC server TCP port
help: Specify HTTP RPC server TCP port
takes_value: true
- ws-port:
long: ws-port
value_name: PORT
help: Specify WebSockets RPC server TCP port
takes_value: true
- bootnodes:
long: bootnodes
+57 -22
View File
@@ -47,8 +47,9 @@ extern crate log;
pub mod error;
mod informant;
use std::path::{Path, PathBuf};
use std::io;
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use futures::sync::mpsc;
use futures::{Sink, Future, Stream};
use tokio_core::reactor;
@@ -117,43 +118,76 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
});
config.roles = role;
config.network.boot_nodes = matches
.values_of("bootnodes")
.map_or(Default::default(), |v| v.map(|n| n.to_owned()).collect());
config.network.config_path = Some(network_path(&base_path).to_string_lossy().into());
config.network.net_config_path = config.network.config_path.clone();
{
config.network.boot_nodes = matches
.values_of("bootnodes")
.map_or(Default::default(), |v| v.map(|n| n.to_owned()).collect());
config.network.config_path = Some(network_path(&base_path).to_string_lossy().into());
config.network.net_config_path = config.network.config_path.clone();
let port = match matches.value_of("port") {
Some(port) => port.parse().expect("Invalid p2p port value specified."),
None => 30333,
};
config.network.listen_address = Some(SocketAddr::new("0.0.0.0".parse().unwrap(), port));
config.network.public_address = None;
config.network.client_version = format!("parity-polkadot/{}", crate_version!());
let port = match matches.value_of("port") {
Some(port) => port.parse().expect("Invalid p2p port value specified."),
None => 30333,
};
config.network.listen_address = Some(SocketAddr::new("0.0.0.0".parse().unwrap(), port));
config.network.public_address = None;
config.network.client_version = format!("parity-polkadot/{}", crate_version!());
}
config.keys = matches.values_of("key").unwrap_or_default().map(str::to_owned).collect();
let service = service::Service::new(config)?;
let mut address: SocketAddr = "127.0.0.1:9933".parse().unwrap();
if let Some(port) = matches.value_of("rpc-port") {
let rpc_port: u16 = port.parse().expect("Invalid RPC port value specified.");
address.set_port(rpc_port);
}
let handler = rpc::rpc_handler(service.client(), service.transaction_pool(), service.client());
let _server = rpc::start_http(&address, handler)?;
informant::start(&service, core.handle());
let (exit_send, exit) = mpsc::channel(1);
ctrlc::CtrlC::set_handler(move || {
exit_send.clone().send(()).wait().expect("Error sending exit notification");
});
let _rpc_servers = {
let http_address = parse_address("127.0.0.1:9933", "rpc-port", &matches)?;
let ws_address = parse_address("127.0.0.1:9944", "ws-port", &matches)?;
let handler = || {
let chain = rpc::apis::chain::Chain::new(service.client(), core.remote());
rpc::rpc_handler(service.client(), chain, service.transaction_pool())
};
(
start_server(http_address, |address| rpc::start_http(address, handler())),
start_server(ws_address, |address| rpc::start_ws(address, handler())),
)
};
core.run(exit.into_future()).expect("Error running informant event loop");
Ok(())
}
fn start_server<T, F>(mut address: SocketAddr, start: F) -> Result<T, io::Error> where
F: Fn(&SocketAddr) -> Result<T, io::Error>,
{
start(&address)
.or_else(|e| match e.kind() {
io::ErrorKind::AddrInUse |
io::ErrorKind::PermissionDenied => {
warn!("Unable to bind server to {}. Trying random port.", address);
address.set_port(0);
start(&address)
},
_ => Err(e),
})
}
fn parse_address(default: &str, port_param: &str, matches: &clap::ArgMatches) -> Result<SocketAddr, String> {
let mut address: SocketAddr = default.parse().unwrap();
if let Some(port) = matches.value_of(port_param) {
let port: u16 = port.parse().ok().ok_or(format!("Invalid port for --{} specified.", port_param))?;
address.set_port(port);
}
Ok(address)
}
fn keystore_path(base_path: &Path) -> PathBuf {
let mut path = base_path.to_owned();
path.push("keystore");
@@ -183,6 +217,7 @@ fn default_base_path() -> PathBuf {
fn init_logger(pattern: &str) {
let mut builder = env_logger::LogBuilder::new();
// Disable info logging by default for some modules:
builder.filter(Some("ws"), log::LogLevelFilter::Warn);
builder.filter(Some("hyper"), log::LogLevelFilter::Warn);
// Enable info for others.
builder.filter(None, log::LogLevelFilter::Info);