mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-31 01:41:03 +00:00
Chain head subscription (#126)
* Start WebSockets server. * Expose non-working subscription. * Dummy subscription for testing. * Proper implementation with event loop. * Finalized pubsub. * Bump clap. * Fix yml. * Disable WS logs. * Remove stale TransactionHash mention * Fix build from nightly API change. * Don't panic on invalid port. * Bind server to random port. * Send only best blocks.
This commit is contained in:
@@ -355,7 +355,6 @@ impl<S: state_machine::Backend> BlockBuilder for ClientBlockBuilder<S>
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use runtime_io::with_externalities;
|
|
||||||
use keyring::Keyring;
|
use keyring::Keyring;
|
||||||
use codec::Slicable;
|
use codec::Slicable;
|
||||||
use client::in_mem::Backend as InMemory;
|
use client::in_mem::Backend as InMemory;
|
||||||
@@ -388,7 +387,7 @@ mod tests {
|
|||||||
::client::new_in_mem(
|
::client::new_in_mem(
|
||||||
LocalDispatch::new(),
|
LocalDispatch::new(),
|
||||||
|| {
|
|| {
|
||||||
let mut storage = genesis_config.build_externalities();
|
let storage = genesis_config.build_externalities();
|
||||||
let block = ::client::genesis::construct_genesis_block(&storage);
|
let block = ::client::genesis::construct_genesis_block(&storage);
|
||||||
(substrate_primitives::block::Header::decode(&mut block.header.encode().as_ref()).expect("to_vec() always gives a valid serialisation; qed"), storage.into_iter().collect())
|
(substrate_primitives::block::Header::decode(&mut block.header.encode().as_ref()).expect("to_vec() always gives a valid serialisation; qed"), storage.into_iter().collect())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -40,7 +40,12 @@ args:
|
|||||||
- rpc-port:
|
- rpc-port:
|
||||||
long: rpc-port
|
long: rpc-port
|
||||||
value_name: PORT
|
value_name: PORT
|
||||||
help: Specify RPC server TCP port
|
help: Specify HTTP RPC server TCP port
|
||||||
|
takes_value: true
|
||||||
|
- ws-port:
|
||||||
|
long: ws-port
|
||||||
|
value_name: PORT
|
||||||
|
help: Specify WebSockets RPC server TCP port
|
||||||
takes_value: true
|
takes_value: true
|
||||||
- bootnodes:
|
- bootnodes:
|
||||||
long: bootnodes
|
long: bootnodes
|
||||||
|
|||||||
+57
-22
@@ -47,8 +47,9 @@ extern crate log;
|
|||||||
pub mod error;
|
pub mod error;
|
||||||
mod informant;
|
mod informant;
|
||||||
|
|
||||||
use std::path::{Path, PathBuf};
|
use std::io;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
|
use std::path::{Path, PathBuf};
|
||||||
use futures::sync::mpsc;
|
use futures::sync::mpsc;
|
||||||
use futures::{Sink, Future, Stream};
|
use futures::{Sink, Future, Stream};
|
||||||
use tokio_core::reactor;
|
use tokio_core::reactor;
|
||||||
@@ -117,43 +118,76 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
|
|||||||
});
|
});
|
||||||
|
|
||||||
config.roles = role;
|
config.roles = role;
|
||||||
config.network.boot_nodes = matches
|
{
|
||||||
.values_of("bootnodes")
|
config.network.boot_nodes = matches
|
||||||
.map_or(Default::default(), |v| v.map(|n| n.to_owned()).collect());
|
.values_of("bootnodes")
|
||||||
config.network.config_path = Some(network_path(&base_path).to_string_lossy().into());
|
.map_or(Default::default(), |v| v.map(|n| n.to_owned()).collect());
|
||||||
config.network.net_config_path = config.network.config_path.clone();
|
config.network.config_path = Some(network_path(&base_path).to_string_lossy().into());
|
||||||
|
config.network.net_config_path = config.network.config_path.clone();
|
||||||
|
|
||||||
let port = match matches.value_of("port") {
|
let port = match matches.value_of("port") {
|
||||||
Some(port) => port.parse().expect("Invalid p2p port value specified."),
|
Some(port) => port.parse().expect("Invalid p2p port value specified."),
|
||||||
None => 30333,
|
None => 30333,
|
||||||
};
|
};
|
||||||
config.network.listen_address = Some(SocketAddr::new("0.0.0.0".parse().unwrap(), port));
|
config.network.listen_address = Some(SocketAddr::new("0.0.0.0".parse().unwrap(), port));
|
||||||
config.network.public_address = None;
|
config.network.public_address = None;
|
||||||
config.network.client_version = format!("parity-polkadot/{}", crate_version!());
|
config.network.client_version = format!("parity-polkadot/{}", crate_version!());
|
||||||
|
}
|
||||||
|
|
||||||
config.keys = matches.values_of("key").unwrap_or_default().map(str::to_owned).collect();
|
config.keys = matches.values_of("key").unwrap_or_default().map(str::to_owned).collect();
|
||||||
|
|
||||||
let service = service::Service::new(config)?;
|
let service = service::Service::new(config)?;
|
||||||
|
|
||||||
let mut address: SocketAddr = "127.0.0.1:9933".parse().unwrap();
|
|
||||||
if let Some(port) = matches.value_of("rpc-port") {
|
|
||||||
let rpc_port: u16 = port.parse().expect("Invalid RPC port value specified.");
|
|
||||||
address.set_port(rpc_port);
|
|
||||||
}
|
|
||||||
|
|
||||||
let handler = rpc::rpc_handler(service.client(), service.transaction_pool(), service.client());
|
|
||||||
let _server = rpc::start_http(&address, handler)?;
|
|
||||||
|
|
||||||
informant::start(&service, core.handle());
|
informant::start(&service, core.handle());
|
||||||
|
|
||||||
let (exit_send, exit) = mpsc::channel(1);
|
let (exit_send, exit) = mpsc::channel(1);
|
||||||
ctrlc::CtrlC::set_handler(move || {
|
ctrlc::CtrlC::set_handler(move || {
|
||||||
exit_send.clone().send(()).wait().expect("Error sending exit notification");
|
exit_send.clone().send(()).wait().expect("Error sending exit notification");
|
||||||
});
|
});
|
||||||
|
|
||||||
|
let _rpc_servers = {
|
||||||
|
let http_address = parse_address("127.0.0.1:9933", "rpc-port", &matches)?;
|
||||||
|
let ws_address = parse_address("127.0.0.1:9944", "ws-port", &matches)?;
|
||||||
|
|
||||||
|
let handler = || {
|
||||||
|
let chain = rpc::apis::chain::Chain::new(service.client(), core.remote());
|
||||||
|
rpc::rpc_handler(service.client(), chain, service.transaction_pool())
|
||||||
|
};
|
||||||
|
(
|
||||||
|
start_server(http_address, |address| rpc::start_http(address, handler())),
|
||||||
|
start_server(ws_address, |address| rpc::start_ws(address, handler())),
|
||||||
|
)
|
||||||
|
};
|
||||||
|
|
||||||
core.run(exit.into_future()).expect("Error running informant event loop");
|
core.run(exit.into_future()).expect("Error running informant event loop");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn start_server<T, F>(mut address: SocketAddr, start: F) -> Result<T, io::Error> where
|
||||||
|
F: Fn(&SocketAddr) -> Result<T, io::Error>,
|
||||||
|
{
|
||||||
|
start(&address)
|
||||||
|
.or_else(|e| match e.kind() {
|
||||||
|
io::ErrorKind::AddrInUse |
|
||||||
|
io::ErrorKind::PermissionDenied => {
|
||||||
|
warn!("Unable to bind server to {}. Trying random port.", address);
|
||||||
|
address.set_port(0);
|
||||||
|
start(&address)
|
||||||
|
},
|
||||||
|
_ => Err(e),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn parse_address(default: &str, port_param: &str, matches: &clap::ArgMatches) -> Result<SocketAddr, String> {
|
||||||
|
let mut address: SocketAddr = default.parse().unwrap();
|
||||||
|
if let Some(port) = matches.value_of(port_param) {
|
||||||
|
let port: u16 = port.parse().ok().ok_or(format!("Invalid port for --{} specified.", port_param))?;
|
||||||
|
address.set_port(port);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(address)
|
||||||
|
}
|
||||||
|
|
||||||
fn keystore_path(base_path: &Path) -> PathBuf {
|
fn keystore_path(base_path: &Path) -> PathBuf {
|
||||||
let mut path = base_path.to_owned();
|
let mut path = base_path.to_owned();
|
||||||
path.push("keystore");
|
path.push("keystore");
|
||||||
@@ -183,6 +217,7 @@ fn default_base_path() -> PathBuf {
|
|||||||
fn init_logger(pattern: &str) {
|
fn init_logger(pattern: &str) {
|
||||||
let mut builder = env_logger::LogBuilder::new();
|
let mut builder = env_logger::LogBuilder::new();
|
||||||
// Disable info logging by default for some modules:
|
// Disable info logging by default for some modules:
|
||||||
|
builder.filter(Some("ws"), log::LogLevelFilter::Warn);
|
||||||
builder.filter(Some("hyper"), log::LogLevelFilter::Warn);
|
builder.filter(Some("hyper"), log::LogLevelFilter::Warn);
|
||||||
// Enable info for others.
|
// Enable info for others.
|
||||||
builder.filter(None, log::LogLevelFilter::Info);
|
builder.filter(None, log::LogLevelFilter::Info);
|
||||||
|
|||||||
Reference in New Issue
Block a user