mirror of
https://github.com/pezkuwichain/pezkuwi-telemetry.git
synced 2026-04-29 12:37:59 +00:00
cargo fmt
This commit is contained in:
@@ -1,29 +1,27 @@
|
||||
use std::net::SocketAddr;
|
||||
use hyper::{ Server, Request, Response, Body };
|
||||
use std::future::Future;
|
||||
use tokio_util::compat::{Compat,TokioAsyncReadCompatExt};
|
||||
use futures::io::{BufReader, BufWriter};
|
||||
use hyper::server::conn::AddrStream;
|
||||
use hyper::{Body, Request, Response, Server};
|
||||
use std::future::Future;
|
||||
use std::net::SocketAddr;
|
||||
use tokio_util::compat::{Compat, TokioAsyncReadCompatExt};
|
||||
|
||||
/// A convenience function to start up a Hyper server and handle requests.
|
||||
pub async fn start_server<H, F>(addr: SocketAddr, handler: H) -> Result<(), anyhow::Error>
|
||||
where
|
||||
H: Clone + Send + Sync + 'static + FnMut(SocketAddr, Request<Body>) -> F,
|
||||
F: Send + 'static + Future<Output = Result<Response<Body>, anyhow::Error>>
|
||||
F: Send + 'static + Future<Output = Result<Response<Body>, anyhow::Error>>,
|
||||
{
|
||||
let service =
|
||||
hyper::service::make_service_fn(move |addr: &AddrStream| {
|
||||
let mut handler = handler.clone();
|
||||
let addr = addr.remote_addr();
|
||||
async move { Ok::<_, hyper::Error>(hyper::service::service_fn(move |r| handler(addr, r))) }
|
||||
});
|
||||
let server = Server::bind(&addr).serve(service);
|
||||
let service = hyper::service::make_service_fn(move |addr: &AddrStream| {
|
||||
let mut handler = handler.clone();
|
||||
let addr = addr.remote_addr();
|
||||
async move { Ok::<_, hyper::Error>(hyper::service::service_fn(move |r| handler(addr, r))) }
|
||||
});
|
||||
let server = Server::bind(&addr).serve(service);
|
||||
|
||||
log::info!("listening on http://{}", server.local_addr());
|
||||
server.await?;
|
||||
|
||||
Ok(())
|
||||
log::info!("listening on http://{}", server.local_addr());
|
||||
server.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
type WsStream = BufReader<BufWriter<Compat<hyper::upgrade::Upgraded>>>;
|
||||
@@ -31,27 +29,40 @@ pub type WsSender = soketto::connection::Sender<WsStream>;
|
||||
pub type WsReceiver = soketto::connection::Receiver<WsStream>;
|
||||
|
||||
/// A convenience function to upgrade a Hyper request into a Soketto Websocket.
|
||||
pub fn upgrade_to_websocket<H,F>(req: Request<Body>, on_upgrade: H) -> hyper::Response<Body>
|
||||
pub fn upgrade_to_websocket<H, F>(req: Request<Body>, on_upgrade: H) -> hyper::Response<Body>
|
||||
where
|
||||
H: 'static + Send + FnOnce(WsSender, WsReceiver) -> F,
|
||||
F: Send + Future<Output = ()>
|
||||
F: Send + Future<Output = ()>,
|
||||
{
|
||||
if !is_upgrade_request(&req) {
|
||||
return basic_response(400, "Expecting WebSocket upgrade headers");
|
||||
return basic_response(400, "Expecting WebSocket upgrade headers");
|
||||
}
|
||||
|
||||
let key = match req.headers().get("Sec-WebSocket-Key") {
|
||||
let key = match req.headers().get("Sec-WebSocket-Key") {
|
||||
Some(key) => key,
|
||||
None => return basic_response(400, "Upgrade to websocket connection failed; Sec-WebSocket-Key header not provided")
|
||||
None => {
|
||||
return basic_response(
|
||||
400,
|
||||
"Upgrade to websocket connection failed; Sec-WebSocket-Key header not provided",
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
if req.headers().get("Sec-WebSocket-Version").map(|v| v.as_bytes()) != Some(b"13") {
|
||||
return basic_response(400, "Sec-WebSocket-Version header should have a value of 13");
|
||||
}
|
||||
if req
|
||||
.headers()
|
||||
.get("Sec-WebSocket-Version")
|
||||
.map(|v| v.as_bytes())
|
||||
!= Some(b"13")
|
||||
{
|
||||
return basic_response(
|
||||
400,
|
||||
"Sec-WebSocket-Version header should have a value of 13",
|
||||
);
|
||||
}
|
||||
|
||||
// Just a little ceremony we need to go to to return the correct response key:
|
||||
let mut accept_key_buf = [0; 32];
|
||||
let accept_key = generate_websocket_accept_key(key.as_bytes(), &mut accept_key_buf);
|
||||
// Just a little ceremony we need to go to to return the correct response key:
|
||||
let mut accept_key_buf = [0; 32];
|
||||
let accept_key = generate_websocket_accept_key(key.as_bytes(), &mut accept_key_buf);
|
||||
|
||||
// Tell the client that we accept the upgrade-to-WS request:
|
||||
let response = Response::builder()
|
||||
@@ -74,13 +85,11 @@ where
|
||||
};
|
||||
|
||||
// Start a Soketto server with it:
|
||||
let server = soketto::handshake::Server::new(
|
||||
BufReader::new(BufWriter::new(stream.compat()))
|
||||
);
|
||||
let server =
|
||||
soketto::handshake::Server::new(BufReader::new(BufWriter::new(stream.compat())));
|
||||
|
||||
// Get hold of a way to send and receive messages:
|
||||
let (sender, receiver)
|
||||
= server.into_builder().finish();
|
||||
let (sender, receiver) = server.into_builder().finish();
|
||||
|
||||
// Pass these to our when-upgraded handler:
|
||||
on_upgrade(sender, receiver).await;
|
||||
@@ -100,40 +109,48 @@ fn basic_response(code: u16, msg: impl AsRef<str>) -> Response<Body> {
|
||||
/// Defined in RFC 6455. this is how we convert the Sec-WebSocket-Key in a request into a
|
||||
/// Sec-WebSocket-Accept that we return in the response.
|
||||
fn generate_websocket_accept_key<'a>(key: &[u8], buf: &'a mut [u8; 32]) -> &'a [u8] {
|
||||
// Defined in RFC 6455, we append this to the key to generate the response:
|
||||
const KEY: &[u8] = b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
|
||||
// Defined in RFC 6455, we append this to the key to generate the response:
|
||||
const KEY: &[u8] = b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
|
||||
|
||||
use sha1::{Digest, Sha1};
|
||||
let mut digest = Sha1::new();
|
||||
digest.update(key);
|
||||
digest.update(KEY);
|
||||
let d = digest.finalize();
|
||||
use sha1::{Digest, Sha1};
|
||||
let mut digest = Sha1::new();
|
||||
digest.update(key);
|
||||
digest.update(KEY);
|
||||
let d = digest.finalize();
|
||||
|
||||
let n = base64::encode_config_slice(&d, base64::STANDARD, buf);
|
||||
&buf[..n]
|
||||
let n = base64::encode_config_slice(&d, base64::STANDARD, buf);
|
||||
&buf[..n]
|
||||
}
|
||||
|
||||
/// Check if a request is a websocket upgrade request.
|
||||
fn is_upgrade_request<B>(request: &hyper::Request<B>) -> bool {
|
||||
header_contains_value(request.headers(), hyper::header::CONNECTION, b"upgrade")
|
||||
&& header_contains_value(request.headers(), hyper::header::UPGRADE, b"websocket")
|
||||
header_contains_value(request.headers(), hyper::header::CONNECTION, b"upgrade")
|
||||
&& header_contains_value(request.headers(), hyper::header::UPGRADE, b"websocket")
|
||||
}
|
||||
|
||||
/// Check if there is a header of the given name containing the wanted value.
|
||||
fn header_contains_value(headers: &hyper::HeaderMap, header: hyper::header::HeaderName, value: &[u8]) -> bool {
|
||||
pub fn trim(x: &[u8]) -> &[u8] {
|
||||
let from = match x.iter().position(|x| !x.is_ascii_whitespace()) {
|
||||
Some(i) => i,
|
||||
None => return &x[0..0],
|
||||
};
|
||||
let to = x.iter().rposition(|x| !x.is_ascii_whitespace()).unwrap();
|
||||
&x[from..=to]
|
||||
}
|
||||
fn header_contains_value(
|
||||
headers: &hyper::HeaderMap,
|
||||
header: hyper::header::HeaderName,
|
||||
value: &[u8],
|
||||
) -> bool {
|
||||
pub fn trim(x: &[u8]) -> &[u8] {
|
||||
let from = match x.iter().position(|x| !x.is_ascii_whitespace()) {
|
||||
Some(i) => i,
|
||||
None => return &x[0..0],
|
||||
};
|
||||
let to = x.iter().rposition(|x| !x.is_ascii_whitespace()).unwrap();
|
||||
&x[from..=to]
|
||||
}
|
||||
|
||||
for header in headers.get_all(header) {
|
||||
if header.as_bytes().split(|&c| c == b',').any(|x| trim(x).eq_ignore_ascii_case(value)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
for header in headers.get_all(header) {
|
||||
if header
|
||||
.as_bytes()
|
||||
.split(|&c| c == b',')
|
||||
.any(|x| trim(x).eq_ignore_ascii_case(value))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
pub mod http_utils;
|
||||
pub mod id_type;
|
||||
pub mod internal_messages;
|
||||
pub mod node_message;
|
||||
pub mod node_types;
|
||||
pub mod ready_chunks_all;
|
||||
pub mod time;
|
||||
pub mod ws_client;
|
||||
pub mod ready_chunks_all;
|
||||
pub mod http_utils;
|
||||
|
||||
mod assign_id;
|
||||
mod dense_map;
|
||||
|
||||
@@ -9,12 +9,12 @@
|
||||
//! Code is adapted from the futures implementation
|
||||
//! (see [ready_chunks.rs](https://docs.rs/futures-util/0.3.15/src/futures_util/stream/stream/ready_chunks.rs.html)).
|
||||
|
||||
use futures::stream::Fuse;
|
||||
use futures::StreamExt;
|
||||
use core::mem;
|
||||
use core::pin::Pin;
|
||||
use futures::stream::Fuse;
|
||||
use futures::stream::{FusedStream, Stream};
|
||||
use futures::task::{Context, Poll};
|
||||
use futures::StreamExt;
|
||||
use pin_project_lite::pin_project;
|
||||
|
||||
pin_project! {
|
||||
@@ -37,7 +37,7 @@ where
|
||||
pub fn new(stream: St) -> Self {
|
||||
Self {
|
||||
stream: stream.fuse(),
|
||||
items: Vec::new()
|
||||
items: Vec::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,21 +5,22 @@ use tokio::net::TcpStream;
|
||||
use tokio_util::compat::TokioAsyncReadCompatExt;
|
||||
|
||||
use super::{
|
||||
sender::{ Sender, SentMessage, SentMessageInternal },
|
||||
receiver::{ Receiver, RecvMessage }
|
||||
receiver::{Receiver, RecvMessage},
|
||||
sender::{Sender, SentMessage, SentMessageInternal},
|
||||
};
|
||||
|
||||
/// The send side of a Soketto WebSocket connection
|
||||
pub type RawSender = soketto::connection::Sender<tokio_util::compat::Compat<tokio::net::TcpStream>>;
|
||||
|
||||
/// The receive side of a Soketto WebSocket connection
|
||||
pub type RawReceiver = soketto::connection::Receiver<tokio_util::compat::Compat<tokio::net::TcpStream>>;
|
||||
pub type RawReceiver =
|
||||
soketto::connection::Receiver<tokio_util::compat::Compat<tokio::net::TcpStream>>;
|
||||
|
||||
/// A websocket connection. From this, we can either expose the raw connection
|
||||
/// or expose a cancel-safe interface to it.
|
||||
pub struct Connection {
|
||||
tx: soketto::connection::Sender<tokio_util::compat::Compat<tokio::net::TcpStream>>,
|
||||
rx: soketto::connection::Receiver<tokio_util::compat::Compat<tokio::net::TcpStream>>
|
||||
rx: soketto::connection::Receiver<tokio_util::compat::Compat<tokio::net::TcpStream>>,
|
||||
}
|
||||
|
||||
impl Connection {
|
||||
@@ -109,7 +110,7 @@ impl Connection {
|
||||
);
|
||||
break;
|
||||
}
|
||||
},
|
||||
}
|
||||
SentMessageInternal::Message(SentMessage::StaticText(s)) => {
|
||||
if let Err(e) = ws_to_connection.send_text(s).await {
|
||||
log::error!(
|
||||
@@ -127,7 +128,7 @@ impl Connection {
|
||||
);
|
||||
break;
|
||||
}
|
||||
},
|
||||
}
|
||||
SentMessageInternal::Close => {
|
||||
if let Err(e) = ws_to_connection.close().await {
|
||||
log::error!("Error attempting to close connection: {}", e);
|
||||
@@ -185,6 +186,6 @@ pub async fn connect(uri: &http::Uri) -> Result<Connection, ConnectError> {
|
||||
|
||||
Ok(Connection {
|
||||
tx: ws_to_connection,
|
||||
rx: ws_from_connection
|
||||
rx: ws_from_connection,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
/// Functionality to establish a connection
|
||||
mod connect;
|
||||
/// The channel based send interface
|
||||
mod sender;
|
||||
/// The channel based receive interface
|
||||
mod receiver;
|
||||
/// The channel based send interface
|
||||
mod sender;
|
||||
|
||||
pub use connect::{ connect, ConnectError, Connection, RawSender, RawReceiver };
|
||||
pub use sender::{ Sender, SentMessage, SendError };
|
||||
pub use receiver::{ Receiver, RecvMessage, RecvError };
|
||||
pub use connect::{connect, ConnectError, Connection, RawReceiver, RawSender};
|
||||
pub use receiver::{Receiver, RecvError, RecvMessage};
|
||||
pub use sender::{SendError, Sender, SentMessage};
|
||||
|
||||
@@ -3,7 +3,7 @@ use futures::{Stream, StreamExt};
|
||||
|
||||
/// Receive messages out of a connection
|
||||
pub struct Receiver {
|
||||
pub (super) inner: mpsc::UnboundedReceiver<Result<RecvMessage, RecvError>>,
|
||||
pub(super) inner: mpsc::UnboundedReceiver<Result<RecvMessage, RecvError>>,
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
@@ -40,4 +40,4 @@ impl RecvMessage {
|
||||
RecvMessage::Text(s) => s.len(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,7 +22,7 @@ pub enum SentMessage {
|
||||
|
||||
/// Messages sent into the channel interface can be anything publically visible, or a close message.
|
||||
#[derive(Debug, Clone)]
|
||||
pub (super) enum SentMessageInternal {
|
||||
pub(super) enum SentMessageInternal {
|
||||
Message(SentMessage),
|
||||
Close,
|
||||
}
|
||||
@@ -30,7 +30,7 @@ pub (super) enum SentMessageInternal {
|
||||
/// Send messages into the connection
|
||||
#[derive(Clone)]
|
||||
pub struct Sender {
|
||||
pub (super) inner: mpsc::UnboundedSender<SentMessageInternal>,
|
||||
pub(super) inner: mpsc::UnboundedSender<SentMessageInternal>,
|
||||
}
|
||||
|
||||
impl Sender {
|
||||
@@ -56,7 +56,7 @@ impl Sender {
|
||||
#[derive(thiserror::Error, Debug, Clone)]
|
||||
pub enum SendError {
|
||||
#[error("Failed to send message: {0}")]
|
||||
ChannelError(#[from] mpsc::SendError)
|
||||
ChannelError(#[from] mpsc::SendError),
|
||||
}
|
||||
|
||||
impl Sink<SentMessage> for Sender {
|
||||
@@ -67,7 +67,10 @@ impl Sink<SentMessage> for Sender {
|
||||
) -> std::task::Poll<Result<(), Self::Error>> {
|
||||
self.inner.poll_ready_unpin(cx).map_err(|e| e.into())
|
||||
}
|
||||
fn start_send(mut self: std::pin::Pin<&mut Self>, item: SentMessage) -> Result<(), Self::Error> {
|
||||
fn start_send(
|
||||
mut self: std::pin::Pin<&mut Self>,
|
||||
item: SentMessage,
|
||||
) -> Result<(), Self::Error> {
|
||||
self.inner
|
||||
.start_send_unpin(SentMessageInternal::Message(item))
|
||||
.map_err(|e| e.into())
|
||||
@@ -84,4 +87,4 @@ impl Sink<SentMessage> for Sender {
|
||||
) -> std::task::Poll<Result<(), Self::Error>> {
|
||||
self.inner.poll_close_unpin(cx).map_err(|e| e.into())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,98 +1,98 @@
|
||||
use std::iter::FromIterator;
|
||||
|
||||
use futures::StreamExt;
|
||||
use test_utils::workspace::start_server_release;
|
||||
use criterion::{criterion_group, criterion_main, Criterion};
|
||||
use tokio::runtime::Runtime;
|
||||
use serde_json::json;
|
||||
use common::node_types::BlockHash;
|
||||
use criterion::{criterion_group, criterion_main, Criterion};
|
||||
use futures::StreamExt;
|
||||
use serde_json::json;
|
||||
use test_utils::workspace::start_server_release;
|
||||
use tokio::runtime::Runtime;
|
||||
|
||||
pub fn benchmark_throughput_single_shard(c: &mut Criterion) {
|
||||
/*
|
||||
let rt = Runtime::new().expect("tokio runtime should start");
|
||||
let rt = Runtime::new().expect("tokio runtime should start");
|
||||
|
||||
// Setup our server and node/feed connections first:
|
||||
let (nodes, feeds) = rt.block_on(async {
|
||||
let mut server = start_server_release().await;
|
||||
let shard_id = server.add_shard().await.unwrap();
|
||||
// Setup our server and node/feed connections first:
|
||||
let (nodes, feeds) = rt.block_on(async {
|
||||
let mut server = start_server_release().await;
|
||||
let shard_id = server.add_shard().await.unwrap();
|
||||
|
||||
// Connect 1000 nodes to the shard:
|
||||
let mut nodes = server
|
||||
.get_shard(shard_id)
|
||||
.unwrap()
|
||||
.connect_multiple(1000)
|
||||
.await
|
||||
.expect("nodes can connect");
|
||||
// Connect 1000 nodes to the shard:
|
||||
let mut nodes = server
|
||||
.get_shard(shard_id)
|
||||
.unwrap()
|
||||
.connect_multiple(1000)
|
||||
.await
|
||||
.expect("nodes can connect");
|
||||
|
||||
// Every node announces itself on the same chain:
|
||||
for (idx, (node_tx, _)) in nodes.iter_mut().enumerate() {
|
||||
node_tx.send_json_text(json!({
|
||||
"id":1, // message ID, not node ID. Can be the same for all.
|
||||
"ts":"2021-07-12T10:37:47.714666+01:00",
|
||||
"payload": {
|
||||
"authority":true,
|
||||
"chain":"Local Testnet",
|
||||
"config":"",
|
||||
"genesis_hash": BlockHash::from_low_u64_ne(1),
|
||||
"implementation":"Substrate Node",
|
||||
"msg":"system.connected",
|
||||
"name": format!("Alice {}", idx),
|
||||
"network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp",
|
||||
"startup_time":"1625565542717",
|
||||
"version":"2.0.0-07a1af348-aarch64-macos"
|
||||
}
|
||||
})).await.unwrap();
|
||||
// Every node announces itself on the same chain:
|
||||
for (idx, (node_tx, _)) in nodes.iter_mut().enumerate() {
|
||||
node_tx.send_json_text(json!({
|
||||
"id":1, // message ID, not node ID. Can be the same for all.
|
||||
"ts":"2021-07-12T10:37:47.714666+01:00",
|
||||
"payload": {
|
||||
"authority":true,
|
||||
"chain":"Local Testnet",
|
||||
"config":"",
|
||||
"genesis_hash": BlockHash::from_low_u64_ne(1),
|
||||
"implementation":"Substrate Node",
|
||||
"msg":"system.connected",
|
||||
"name": format!("Alice {}", idx),
|
||||
"network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp",
|
||||
"startup_time":"1625565542717",
|
||||
"version":"2.0.0-07a1af348-aarch64-macos"
|
||||
}
|
||||
})).await.unwrap();
|
||||
}
|
||||
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
|
||||
// Start 1000 feeds:
|
||||
let mut feeds = server
|
||||
.get_core()
|
||||
.connect_multiple(1)
|
||||
.await
|
||||
.expect("feeds can connect");
|
||||
|
||||
// // Subscribe all feeds to the chain:
|
||||
// for (feed_tx, _) in feeds.iter_mut() {
|
||||
// feed_tx.send_command("subscribe", "Local Testnet").await.unwrap();
|
||||
// }
|
||||
|
||||
println!("consuming feed");
|
||||
{
|
||||
|
||||
let mut msgs = futures::stream::FuturesUnordered::from_iter(
|
||||
feeds
|
||||
.iter_mut()
|
||||
.map(|(_,rx)| rx.recv_feed_messages())
|
||||
);
|
||||
|
||||
let mut n = 0;
|
||||
while let Some(Ok(msg)) = msgs.next().await {
|
||||
n += 1;
|
||||
println!("Message {}: {:?}", n, msg);
|
||||
}
|
||||
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
|
||||
// Start 1000 feeds:
|
||||
let mut feeds = server
|
||||
.get_core()
|
||||
.connect_multiple(1)
|
||||
.await
|
||||
.expect("feeds can connect");
|
||||
|
||||
// // Subscribe all feeds to the chain:
|
||||
// for (feed_tx, _) in feeds.iter_mut() {
|
||||
// feed_tx.send_command("subscribe", "Local Testnet").await.unwrap();
|
||||
// }
|
||||
|
||||
println!("consuming feed");
|
||||
{
|
||||
|
||||
let mut msgs = futures::stream::FuturesUnordered::from_iter(
|
||||
feeds
|
||||
.iter_mut()
|
||||
.map(|(_,rx)| rx.recv_feed_messages())
|
||||
);
|
||||
|
||||
let mut n = 0;
|
||||
while let Some(Ok(msg)) = msgs.next().await {
|
||||
n += 1;
|
||||
println!("Message {}: {:?}", n, msg);
|
||||
}
|
||||
}
|
||||
|
||||
// // Consume any messages feeds have received so far (every feed should havea few at least):
|
||||
// let feed_consumers = feeds
|
||||
// .iter_mut()
|
||||
// .map(|(_,rx)| rx.next());
|
||||
// futures::future::join_all(feed_consumers).await;
|
||||
println!("feed consumed");
|
||||
(nodes, feeds)
|
||||
});
|
||||
// // Consume any messages feeds have received so far (every feed should havea few at least):
|
||||
// let feed_consumers = feeds
|
||||
// .iter_mut()
|
||||
// .map(|(_,rx)| rx.next());
|
||||
// futures::future::join_all(feed_consumers).await;
|
||||
println!("feed consumed");
|
||||
(nodes, feeds)
|
||||
});
|
||||
|
||||
// Next, run criterion using the same tokio runtime to benchmark time taken to send
|
||||
// messages to nodes and receive them from feeds.
|
||||
c.bench_function(
|
||||
"throughput time",
|
||||
|b| b.to_async(&rt).iter(|| async {
|
||||
// Next, run criterion using the same tokio runtime to benchmark time taken to send
|
||||
// messages to nodes and receive them from feeds.
|
||||
c.bench_function(
|
||||
"throughput time",
|
||||
|b| b.to_async(&rt).iter(|| async {
|
||||
|
||||
// TODO: Actually implement the benchmark.
|
||||
// TODO: Actually implement the benchmark.
|
||||
|
||||
})
|
||||
);
|
||||
*/
|
||||
})
|
||||
);
|
||||
*/
|
||||
}
|
||||
|
||||
criterion_group!(benches, benchmark_throughput_single_shard);
|
||||
criterion_main!(benches);
|
||||
criterion_main!(benches);
|
||||
|
||||
@@ -74,7 +74,8 @@ impl Aggregator {
|
||||
/// Return a sink that a shard can send messages into to be handled by the aggregator.
|
||||
pub fn subscribe_shard(
|
||||
&self,
|
||||
) -> impl Sink<inner_loop::FromShardWebsocket, Error = anyhow::Error> + Send + Sync + Unpin + 'static {
|
||||
) -> impl Sink<inner_loop::FromShardWebsocket, Error = anyhow::Error> + Send + Sync + Unpin + 'static
|
||||
{
|
||||
// Assign a unique aggregator-local ID to each connection that subscribes, and pass
|
||||
// that along with every message to the aggregator loop:
|
||||
let shard_conn_id = self
|
||||
@@ -96,7 +97,8 @@ impl Aggregator {
|
||||
/// Return a sink that a feed can send messages into to be handled by the aggregator.
|
||||
pub fn subscribe_feed(
|
||||
&self,
|
||||
) -> impl Sink<inner_loop::FromFeedWebsocket, Error = anyhow::Error> + Send + Sync + Unpin + 'static {
|
||||
) -> impl Sink<inner_loop::FromFeedWebsocket, Error = anyhow::Error> + Send + Sync + Unpin + 'static
|
||||
{
|
||||
// Assign a unique aggregator-local ID to each connection that subscribes, and pass
|
||||
// that along with every message to the aggregator loop:
|
||||
let feed_conn_id = self
|
||||
|
||||
@@ -179,11 +179,7 @@ impl InnerLoop {
|
||||
}
|
||||
|
||||
/// Handle messages that come from the node geographical locator.
|
||||
fn handle_from_find_location(
|
||||
&mut self,
|
||||
node_id: NodeId,
|
||||
location: find_location::Location,
|
||||
) {
|
||||
fn handle_from_find_location(&mut self, node_id: NodeId, location: find_location::Location) {
|
||||
self.node_state
|
||||
.update_node_location(node_id, location.clone());
|
||||
|
||||
@@ -227,20 +223,18 @@ impl InnerLoop {
|
||||
match self.node_state.add_node(genesis_hash, node) {
|
||||
state::AddNodeResult::ChainOnDenyList => {
|
||||
if let Some(shard_conn) = self.shard_channels.get_mut(&shard_conn_id) {
|
||||
let _ = shard_conn
|
||||
.unbounded_send(ToShardWebsocket::Mute {
|
||||
local_id,
|
||||
reason: MuteReason::ChainNotAllowed,
|
||||
});
|
||||
let _ = shard_conn.unbounded_send(ToShardWebsocket::Mute {
|
||||
local_id,
|
||||
reason: MuteReason::ChainNotAllowed,
|
||||
});
|
||||
}
|
||||
}
|
||||
state::AddNodeResult::ChainOverQuota => {
|
||||
if let Some(shard_conn) = self.shard_channels.get_mut(&shard_conn_id) {
|
||||
let _ = shard_conn
|
||||
.unbounded_send(ToShardWebsocket::Mute {
|
||||
local_id,
|
||||
reason: MuteReason::Overquota,
|
||||
});
|
||||
let _ = shard_conn.unbounded_send(ToShardWebsocket::Mute {
|
||||
local_id,
|
||||
reason: MuteReason::Overquota,
|
||||
});
|
||||
}
|
||||
}
|
||||
state::AddNodeResult::NodeAddedToChain(details) => {
|
||||
@@ -473,10 +467,7 @@ impl InnerLoop {
|
||||
}
|
||||
|
||||
/// Remove all of the node IDs provided and broadcast messages to feeds as needed.
|
||||
fn remove_nodes_and_broadcast_result(
|
||||
&mut self,
|
||||
node_ids: impl IntoIterator<Item = NodeId>,
|
||||
) {
|
||||
fn remove_nodes_and_broadcast_result(&mut self, node_ids: impl IntoIterator<Item = NodeId>) {
|
||||
// Group by chain to simplify the handling of feed messages:
|
||||
let mut node_ids_per_chain: HashMap<BlockHash, Vec<NodeId>> = HashMap::new();
|
||||
for node_id in node_ids.into_iter() {
|
||||
|
||||
@@ -3,19 +3,19 @@ mod feed_message;
|
||||
mod find_location;
|
||||
mod state;
|
||||
use std::str::FromStr;
|
||||
use tokio::time::{ Duration, Instant };
|
||||
use tokio::time::{Duration, Instant};
|
||||
|
||||
use aggregator::{
|
||||
Aggregator, FromFeedWebsocket, FromShardWebsocket, ToFeedWebsocket, ToShardWebsocket,
|
||||
};
|
||||
use bincode::Options;
|
||||
use common::http_utils;
|
||||
use common::internal_messages;
|
||||
use common::ready_chunks_all::ReadyChunksAll;
|
||||
use futures::{channel::mpsc, SinkExt, StreamExt};
|
||||
use hyper::{Method, Response};
|
||||
use simple_logger::SimpleLogger;
|
||||
use structopt::StructOpt;
|
||||
use hyper::{ Response, Method };
|
||||
use common::http_utils;
|
||||
|
||||
const VERSION: &str = env!("CARGO_PKG_VERSION");
|
||||
const AUTHORS: &str = env!("CARGO_PKG_AUTHORS");
|
||||
@@ -42,7 +42,7 @@ struct Opts {
|
||||
/// If it takes longer than this number of seconds to send the current batch of messages
|
||||
/// to a feed, the feed connection will be closed.
|
||||
#[structopt(long, default_value = "10")]
|
||||
feed_timeout: u64
|
||||
feed_timeout: u64,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
@@ -72,40 +72,55 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> {
|
||||
async move {
|
||||
match (req.method(), req.uri().path().trim_end_matches('/')) {
|
||||
// Check that the server is up and running:
|
||||
(&Method::GET, "/health") => {
|
||||
Ok(Response::new("OK".into()))
|
||||
},
|
||||
(&Method::GET, "/health") => Ok(Response::new("OK".into())),
|
||||
// Subscribe to feed messages:
|
||||
(&Method::GET, "/feed") => {
|
||||
Ok(http_utils::upgrade_to_websocket(req, move |ws_send, ws_recv| async move {
|
||||
let tx_to_aggregator = aggregator.subscribe_feed();
|
||||
let (mut tx_to_aggregator, mut ws_send)
|
||||
= handle_feed_websocket_connection(ws_send, ws_recv, tx_to_aggregator, feed_timeout).await;
|
||||
log::info!("Closing /feed connection from {:?}", addr);
|
||||
// Tell the aggregator that this connection has closed, so it can tidy up.
|
||||
let _ = tx_to_aggregator.send(FromFeedWebsocket::Disconnected).await;
|
||||
let _ = ws_send.close().await;
|
||||
}))
|
||||
},
|
||||
Ok(http_utils::upgrade_to_websocket(
|
||||
req,
|
||||
move |ws_send, ws_recv| async move {
|
||||
let tx_to_aggregator = aggregator.subscribe_feed();
|
||||
let (mut tx_to_aggregator, mut ws_send) =
|
||||
handle_feed_websocket_connection(
|
||||
ws_send,
|
||||
ws_recv,
|
||||
tx_to_aggregator,
|
||||
feed_timeout,
|
||||
)
|
||||
.await;
|
||||
log::info!("Closing /feed connection from {:?}", addr);
|
||||
// Tell the aggregator that this connection has closed, so it can tidy up.
|
||||
let _ = tx_to_aggregator.send(FromFeedWebsocket::Disconnected).await;
|
||||
let _ = ws_send.close().await;
|
||||
},
|
||||
))
|
||||
}
|
||||
// Subscribe to shard messages:
|
||||
(&Method::GET, "/shard_submit") => {
|
||||
Ok(http_utils::upgrade_to_websocket(req, move |ws_send, ws_recv| async move {
|
||||
let tx_to_aggregator = aggregator.subscribe_shard();
|
||||
let (mut tx_to_aggregator, mut ws_send)
|
||||
= handle_shard_websocket_connection(ws_send, ws_recv, tx_to_aggregator).await;
|
||||
log::info!("Closing /shard_submit connection from {:?}", addr);
|
||||
// Tell the aggregator that this connection has closed, so it can tidy up.
|
||||
let _ = tx_to_aggregator.send(FromShardWebsocket::Disconnected).await;
|
||||
let _ = ws_send.close().await;
|
||||
}))
|
||||
},
|
||||
// 404 for anything else:
|
||||
_ => {
|
||||
Ok(Response::builder()
|
||||
.status(404)
|
||||
.body("Not found".into())
|
||||
.unwrap())
|
||||
Ok(http_utils::upgrade_to_websocket(
|
||||
req,
|
||||
move |ws_send, ws_recv| async move {
|
||||
let tx_to_aggregator = aggregator.subscribe_shard();
|
||||
let (mut tx_to_aggregator, mut ws_send) =
|
||||
handle_shard_websocket_connection(
|
||||
ws_send,
|
||||
ws_recv,
|
||||
tx_to_aggregator,
|
||||
)
|
||||
.await;
|
||||
log::info!("Closing /shard_submit connection from {:?}", addr);
|
||||
// Tell the aggregator that this connection has closed, so it can tidy up.
|
||||
let _ = tx_to_aggregator
|
||||
.send(FromShardWebsocket::Disconnected)
|
||||
.await;
|
||||
let _ = ws_send.close().await;
|
||||
},
|
||||
))
|
||||
}
|
||||
// 404 for anything else:
|
||||
_ => Ok(Response::builder()
|
||||
.status(404)
|
||||
.body("Not found".into())
|
||||
.unwrap()),
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -156,29 +171,44 @@ where
|
||||
break;
|
||||
}
|
||||
if let Err(e) = msg_info {
|
||||
log::error!("Shutting down websocket connection: Failed to receive data: {}", e);
|
||||
log::error!(
|
||||
"Shutting down websocket connection: Failed to receive data: {}",
|
||||
e
|
||||
);
|
||||
break;
|
||||
}
|
||||
|
||||
let msg: internal_messages::FromShardAggregator = match bincode::options().deserialize(&bytes) {
|
||||
Ok(msg) => msg,
|
||||
Err(e) => {
|
||||
log::error!("Failed to deserialize message from shard; booting it: {}", e);
|
||||
break;
|
||||
}
|
||||
};
|
||||
let msg: internal_messages::FromShardAggregator =
|
||||
match bincode::options().deserialize(&bytes) {
|
||||
Ok(msg) => msg,
|
||||
Err(e) => {
|
||||
log::error!(
|
||||
"Failed to deserialize message from shard; booting it: {}",
|
||||
e
|
||||
);
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
// Convert and send to the aggregator:
|
||||
let aggregator_msg = match msg {
|
||||
internal_messages::FromShardAggregator::AddNode { ip, node, local_id, genesis_hash } => {
|
||||
FromShardWebsocket::Add { ip, node, genesis_hash, local_id }
|
||||
internal_messages::FromShardAggregator::AddNode {
|
||||
ip,
|
||||
node,
|
||||
local_id,
|
||||
genesis_hash,
|
||||
} => FromShardWebsocket::Add {
|
||||
ip,
|
||||
node,
|
||||
genesis_hash,
|
||||
local_id,
|
||||
},
|
||||
internal_messages::FromShardAggregator::UpdateNode { payload, local_id } => {
|
||||
FromShardWebsocket::Update { local_id, payload }
|
||||
},
|
||||
}
|
||||
internal_messages::FromShardAggregator::RemoveNode { local_id } => {
|
||||
FromShardWebsocket::Remove { local_id }
|
||||
},
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(e) = tx_to_aggregator.send(aggregator_msg).await {
|
||||
@@ -201,7 +231,7 @@ where
|
||||
|
||||
let msg = match msg {
|
||||
Some(msg) => msg,
|
||||
None => break
|
||||
None => break,
|
||||
};
|
||||
|
||||
let internal_msg = match msg {
|
||||
@@ -218,9 +248,11 @@ where
|
||||
log::error!("Failed to send message to aggregator; closing shard: {}", e)
|
||||
}
|
||||
if let Err(e) = ws_send.flush().await {
|
||||
log::error!("Failed to flush message to aggregator; closing shard: {}", e)
|
||||
log::error!(
|
||||
"Failed to flush message to aggregator; closing shard: {}",
|
||||
e
|
||||
)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
drop(recv_closer_tx); // Kill the recv task if this send task ends
|
||||
@@ -241,7 +273,7 @@ async fn handle_feed_websocket_connection<S>(
|
||||
mut ws_send: http_utils::WsSender,
|
||||
mut ws_recv: http_utils::WsReceiver,
|
||||
mut tx_to_aggregator: S,
|
||||
feed_timeout: u64
|
||||
feed_timeout: u64,
|
||||
) -> (S, http_utils::WsSender)
|
||||
where
|
||||
S: futures::Sink<FromFeedWebsocket, Error = anyhow::Error> + Unpin + Send + 'static,
|
||||
@@ -281,29 +313,35 @@ where
|
||||
break;
|
||||
}
|
||||
if let Err(e) = msg_info {
|
||||
log::error!("Shutting down websocket connection: Failed to receive data: {}", e);
|
||||
log::error!(
|
||||
"Shutting down websocket connection: Failed to receive data: {}",
|
||||
e
|
||||
);
|
||||
break;
|
||||
}
|
||||
|
||||
// We ignore all but valid UTF8 text messages from the frontend:
|
||||
let text = match String::from_utf8(bytes) {
|
||||
Ok(s) => s,
|
||||
Err(_) => continue
|
||||
Err(_) => continue,
|
||||
};
|
||||
|
||||
// Parse the message into a command we understand and send it to the aggregator:
|
||||
let cmd = match FromFeedWebsocket::from_str(&text) {
|
||||
Ok(cmd) => cmd,
|
||||
Err(e) => {
|
||||
log::warn!("Ignoring invalid command '{}' from the frontend: {}", text, e);
|
||||
continue
|
||||
log::warn!(
|
||||
"Ignoring invalid command '{}' from the frontend: {}",
|
||||
text,
|
||||
e
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
if let Err(e) = tx_to_aggregator.send(cmd).await {
|
||||
log::error!("Failed to send message to aggregator; closing feed: {}", e);
|
||||
break;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
drop(send_closer_tx); // Kill the send task if this recv task ends
|
||||
@@ -323,15 +361,13 @@ where
|
||||
// End the loop when connection from aggregator ends:
|
||||
let msgs = match msgs {
|
||||
Some(msgs) => msgs,
|
||||
None => break
|
||||
None => break,
|
||||
};
|
||||
|
||||
// There is only one message type at the mo; bytes to send
|
||||
// to the websocket. collect them all up to dispatch in one shot.
|
||||
let all_msg_bytes = msgs.into_iter().map(|msg| {
|
||||
match msg {
|
||||
ToFeedWebsocket::Bytes(bytes) => bytes
|
||||
}
|
||||
let all_msg_bytes = msgs.into_iter().map(|msg| match msg {
|
||||
ToFeedWebsocket::Bytes(bytes) => bytes,
|
||||
});
|
||||
|
||||
// We have a deadline to send and flush messages. If the client isn't keeping up with our
|
||||
@@ -340,7 +376,9 @@ where
|
||||
let message_send_deadline = Instant::now() + Duration::from_secs(feed_timeout);
|
||||
|
||||
for bytes in all_msg_bytes {
|
||||
match tokio::time::timeout_at(message_send_deadline, ws_send.send_binary(&bytes)).await {
|
||||
match tokio::time::timeout_at(message_send_deadline, ws_send.send_binary(&bytes))
|
||||
.await
|
||||
{
|
||||
Err(_) => {
|
||||
log::warn!("Closing feed websocket that was too slow to keep up (1)");
|
||||
break 'outer;
|
||||
@@ -355,7 +393,7 @@ where
|
||||
match tokio::time::timeout_at(message_send_deadline, ws_send.flush()).await {
|
||||
Err(_) => {
|
||||
log::warn!("Closing feed websocket that was too slow to keep up (2)");
|
||||
break
|
||||
break;
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
log::warn!("Closing feed websocket due to error flushing data: {}", e);
|
||||
|
||||
@@ -6,7 +6,7 @@ use std::time::Duration;
|
||||
use test_utils::{
|
||||
assert_contains_matches,
|
||||
feed_message_de::{FeedMessage, NodeDetails},
|
||||
workspace::{ start_server, CoreOpts, ShardOpts, start_server_debug }
|
||||
workspace::{start_server, start_server_debug, CoreOpts, ShardOpts},
|
||||
};
|
||||
|
||||
/// The simplest test we can run; the main benefit of this test (since we check similar)
|
||||
@@ -55,7 +55,6 @@ async fn feed_ping_responded_to_with_pong() {
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
|
||||
/// As a prelude to `lots_of_mute_messages_dont_cause_a_deadlock`, we can check that
|
||||
/// a lot of nodes can simultaneously subscribe and are all sent the expected response.
|
||||
#[tokio::test]
|
||||
@@ -70,10 +69,8 @@ async fn multiple_feeds_sent_version_on_connect() {
|
||||
.unwrap();
|
||||
|
||||
// Wait for responses all at once:
|
||||
let responses = futures::future::join_all(
|
||||
feeds.iter_mut()
|
||||
.map(|(_, rx)| rx.recv_feed_messages())
|
||||
);
|
||||
let responses =
|
||||
futures::future::join_all(feeds.iter_mut().map(|(_, rx)| rx.recv_feed_messages()));
|
||||
|
||||
let responses = tokio::time::timeout(Duration::from_secs(10), responses)
|
||||
.await
|
||||
@@ -113,22 +110,24 @@ async fn lots_of_mute_messages_dont_cause_a_deadlock() {
|
||||
|
||||
// Every node announces itself on the same chain:
|
||||
for (idx, (node_tx, _)) in nodes.iter_mut().enumerate() {
|
||||
node_tx.send_json_text(json!({
|
||||
"id":1, // message ID, not node ID. Can be the same for all.
|
||||
"ts":"2021-07-12T10:37:47.714666+01:00",
|
||||
"payload": {
|
||||
"authority":true,
|
||||
"chain":"Local Testnet",
|
||||
"config":"",
|
||||
"genesis_hash": BlockHash::from_low_u64_ne(1),
|
||||
"implementation":"Substrate Node",
|
||||
"msg":"system.connected",
|
||||
"name": format!("Alice {}", idx),
|
||||
"network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp",
|
||||
"startup_time":"1625565542717",
|
||||
"version":"2.0.0-07a1af348-aarch64-macos"
|
||||
}
|
||||
})).unwrap();
|
||||
node_tx
|
||||
.send_json_text(json!({
|
||||
"id":1, // message ID, not node ID. Can be the same for all.
|
||||
"ts":"2021-07-12T10:37:47.714666+01:00",
|
||||
"payload": {
|
||||
"authority":true,
|
||||
"chain":"Local Testnet",
|
||||
"config":"",
|
||||
"genesis_hash": BlockHash::from_low_u64_ne(1),
|
||||
"implementation":"Substrate Node",
|
||||
"msg":"system.connected",
|
||||
"name": format!("Alice {}", idx),
|
||||
"network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp",
|
||||
"startup_time":"1625565542717",
|
||||
"version":"2.0.0-07a1af348-aarch64-macos"
|
||||
}
|
||||
}))
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
// Wait a little time (just to let everything get deadlocked) before
|
||||
@@ -144,11 +143,8 @@ async fn lots_of_mute_messages_dont_cause_a_deadlock() {
|
||||
.expect("feeds can connect");
|
||||
|
||||
// Wait to see whether we get anything back:
|
||||
let msgs_fut = futures::future::join_all(
|
||||
feeds
|
||||
.iter_mut()
|
||||
.map(|(_,rx)| rx.recv_feed_messages())
|
||||
);
|
||||
let msgs_fut =
|
||||
futures::future::join_all(feeds.iter_mut().map(|(_, rx)| rx.recv_feed_messages()));
|
||||
|
||||
// Give up after a timeout:
|
||||
tokio::time::timeout(Duration::from_secs(10), msgs_fut)
|
||||
@@ -234,29 +230,35 @@ async fn feeds_told_about_chain_rename_and_stay_subscribed() {
|
||||
.await
|
||||
.expect("can connect to shard");
|
||||
|
||||
let node_init_msg = |id, chain_name: &str, node_name: &str| json!({
|
||||
"id":id,
|
||||
"ts":"2021-07-12T10:37:47.714666+01:00",
|
||||
"payload": {
|
||||
"authority":true,
|
||||
"chain": chain_name,
|
||||
"config":"",
|
||||
"genesis_hash": BlockHash::from_low_u64_ne(1),
|
||||
"implementation":"Substrate Node",
|
||||
"msg":"system.connected",
|
||||
"name": node_name,
|
||||
"network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp",
|
||||
"startup_time":"1625565542717",
|
||||
"version":"2.0.0-07a1af348-aarch64-macos"
|
||||
},
|
||||
});
|
||||
let node_init_msg = |id, chain_name: &str, node_name: &str| {
|
||||
json!({
|
||||
"id":id,
|
||||
"ts":"2021-07-12T10:37:47.714666+01:00",
|
||||
"payload": {
|
||||
"authority":true,
|
||||
"chain": chain_name,
|
||||
"config":"",
|
||||
"genesis_hash": BlockHash::from_low_u64_ne(1),
|
||||
"implementation":"Substrate Node",
|
||||
"msg":"system.connected",
|
||||
"name": node_name,
|
||||
"network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp",
|
||||
"startup_time":"1625565542717",
|
||||
"version":"2.0.0-07a1af348-aarch64-macos"
|
||||
},
|
||||
})
|
||||
};
|
||||
|
||||
// Subscribe a chain:
|
||||
node_tx.send_json_text(node_init_msg(1, "Initial chain name", "Node 1")).unwrap();
|
||||
node_tx
|
||||
.send_json_text(node_init_msg(1, "Initial chain name", "Node 1"))
|
||||
.unwrap();
|
||||
|
||||
// Connect a feed and subscribe to the above chain:
|
||||
let (mut feed_tx, mut feed_rx) = server.get_core().connect_feed().await.unwrap();
|
||||
feed_tx.send_command("subscribe", "Initial chain name").unwrap();
|
||||
feed_tx
|
||||
.send_command("subscribe", "Initial chain name")
|
||||
.unwrap();
|
||||
|
||||
// Feed is told about the chain, and the node on this chain:
|
||||
let feed_messages = feed_rx.recv_feed_messages().await.unwrap();
|
||||
@@ -269,7 +271,9 @@ async fn feeds_told_about_chain_rename_and_stay_subscribed() {
|
||||
|
||||
// Subscribe another node. The chain doesn't rename yet but we are told about the new node
|
||||
// count and the node that's been added.
|
||||
node_tx.send_json_text(node_init_msg(2, "New chain name", "Node 2")).unwrap();
|
||||
node_tx
|
||||
.send_json_text(node_init_msg(2, "New chain name", "Node 2"))
|
||||
.unwrap();
|
||||
let feed_messages = feed_rx.recv_feed_messages().await.unwrap();
|
||||
assert_contains_matches!(
|
||||
feed_messages,
|
||||
@@ -279,7 +283,9 @@ async fn feeds_told_about_chain_rename_and_stay_subscribed() {
|
||||
|
||||
// Subscribe a third node. The chain renames, so we're told about the new node but also
|
||||
// about the chain rename.
|
||||
node_tx.send_json_text(node_init_msg(3, "New chain name", "Node 3")).unwrap();
|
||||
node_tx
|
||||
.send_json_text(node_init_msg(3, "New chain name", "Node 3"))
|
||||
.unwrap();
|
||||
let feed_messages = feed_rx.recv_feed_messages().await.unwrap();
|
||||
assert_contains_matches!(
|
||||
feed_messages,
|
||||
@@ -290,14 +296,15 @@ async fn feeds_told_about_chain_rename_and_stay_subscribed() {
|
||||
|
||||
// Just to be sure, subscribing a fourth node on this chain will still lead to updates
|
||||
// to this feed.
|
||||
node_tx.send_json_text(node_init_msg(4, "New chain name", "Node 4")).unwrap();
|
||||
node_tx
|
||||
.send_json_text(node_init_msg(4, "New chain name", "Node 4"))
|
||||
.unwrap();
|
||||
let feed_messages = feed_rx.recv_feed_messages().await.unwrap();
|
||||
assert_contains_matches!(
|
||||
feed_messages,
|
||||
FeedMessage::AddedNode { node: NodeDetails { name: node_name, .. }, ..} if node_name == "Node 4",
|
||||
FeedMessage::AddedChain { name, node_count: 4 } if name == "New chain name",
|
||||
);
|
||||
|
||||
}
|
||||
|
||||
/// If we add a couple of shards and a node for each, all feeds should be
|
||||
@@ -386,7 +393,12 @@ async fn feed_can_subscribe_and_unsubscribe_from_chain() {
|
||||
// Start server, add shard, connect node:
|
||||
let mut server = start_server_debug().await;
|
||||
let shard_id = server.add_shard().await.unwrap();
|
||||
let (mut node_tx, _node_rx) = server.get_shard(shard_id).unwrap().connect_node().await.unwrap();
|
||||
let (mut node_tx, _node_rx) = server
|
||||
.get_shard(shard_id)
|
||||
.unwrap()
|
||||
.connect_node()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Send a "system connected" message for a few nodes/chains:
|
||||
for id in 1..=3 {
|
||||
@@ -419,7 +431,9 @@ async fn feed_can_subscribe_and_unsubscribe_from_chain() {
|
||||
assert_contains_matches!(feed_messages, AddedChain { name, node_count: 1 } if name == "Local Testnet 1");
|
||||
|
||||
// Subscribe it to a chain
|
||||
feed_tx.send_command("subscribe", "Local Testnet 1").unwrap();
|
||||
feed_tx
|
||||
.send_command("subscribe", "Local Testnet 1")
|
||||
.unwrap();
|
||||
|
||||
let feed_messages = feed_rx.recv_feed_messages().await.unwrap();
|
||||
assert_contains_matches!(
|
||||
@@ -450,7 +464,9 @@ async fn feed_can_subscribe_and_unsubscribe_from_chain() {
|
||||
.expect_err("Timeout should elapse since no messages sent");
|
||||
|
||||
// We can change our subscription:
|
||||
feed_tx.send_command("subscribe", "Local Testnet 2").unwrap();
|
||||
feed_tx
|
||||
.send_command("subscribe", "Local Testnet 2")
|
||||
.unwrap();
|
||||
let feed_messages = feed_rx.recv_feed_messages().await.unwrap();
|
||||
|
||||
// We are told about the subscription change and given similar on-subscribe messages to above.
|
||||
@@ -484,36 +500,48 @@ async fn slow_feeds_are_disconnected() {
|
||||
let mut server = start_server(
|
||||
true,
|
||||
// Timeout faster so the test can be quicker:
|
||||
CoreOpts { feed_timeout: Some(1) },
|
||||
CoreOpts {
|
||||
feed_timeout: Some(1),
|
||||
},
|
||||
// Allow us to send more messages in more easily:
|
||||
ShardOpts { max_nodes_per_connection: Some(100_000) }
|
||||
).await;
|
||||
ShardOpts {
|
||||
max_nodes_per_connection: Some(100_000),
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
// Give us a shard to talk to:
|
||||
let shard_id = server.add_shard().await.unwrap();
|
||||
let (mut node_tx, _node_rx) = server.get_shard(shard_id).unwrap().connect_node().await.unwrap();
|
||||
let (mut node_tx, _node_rx) = server
|
||||
.get_shard(shard_id)
|
||||
.unwrap()
|
||||
.connect_node()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Add a load of nodes from this shard so there's plenty of data to give to a feed.
|
||||
// We want to exhaust any buffers between core and feed (eg BufWriters). If the number
|
||||
// is too low, data will happily be sent into a buffer and the connection won't need to
|
||||
// be closed.
|
||||
for n in 1..50_000 {
|
||||
node_tx.send_json_text(json!({
|
||||
"id":n,
|
||||
"ts":"2021-07-12T10:37:47.714666+01:00",
|
||||
"payload": {
|
||||
"authority":true,
|
||||
"chain":"Polkadot",
|
||||
"config":"",
|
||||
"genesis_hash": BlockHash::from_low_u64_ne(1),
|
||||
"implementation":"Substrate Node",
|
||||
"msg":"system.connected",
|
||||
"name": format!("Alice {}", n),
|
||||
"network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp",
|
||||
"startup_time":"1625565542717",
|
||||
"version":"2.0.0-07a1af348-aarch64-macos"
|
||||
}
|
||||
})).unwrap();
|
||||
node_tx
|
||||
.send_json_text(json!({
|
||||
"id":n,
|
||||
"ts":"2021-07-12T10:37:47.714666+01:00",
|
||||
"payload": {
|
||||
"authority":true,
|
||||
"chain":"Polkadot",
|
||||
"config":"",
|
||||
"genesis_hash": BlockHash::from_low_u64_ne(1),
|
||||
"implementation":"Substrate Node",
|
||||
"msg":"system.connected",
|
||||
"name": format!("Alice {}", n),
|
||||
"network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp",
|
||||
"startup_time":"1625565542717",
|
||||
"version":"2.0.0-07a1af348-aarch64-macos"
|
||||
}
|
||||
}))
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
// Connect a raw feed so that we can control how fast we consume data from the websocket
|
||||
@@ -530,10 +558,8 @@ async fn slow_feeds_are_disconnected() {
|
||||
// waiting to receive mroe data (or see some other error).
|
||||
loop {
|
||||
let mut v = Vec::new();
|
||||
let data = tokio::time::timeout(
|
||||
Duration::from_secs(1),
|
||||
raw_feed_rx.receive_data(&mut v)
|
||||
).await;
|
||||
let data =
|
||||
tokio::time::timeout(Duration::from_secs(1), raw_feed_rx.receive_data(&mut v)).await;
|
||||
|
||||
match data {
|
||||
Ok(Ok(_)) => {
|
||||
@@ -541,13 +567,13 @@ async fn slow_feeds_are_disconnected() {
|
||||
}
|
||||
Ok(Err(soketto::connection::Error::Closed)) => {
|
||||
break; // End loop; success!
|
||||
},
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
panic!("recv should be closed but instead we saw this error: {}", e);
|
||||
},
|
||||
}
|
||||
Err(_) => {
|
||||
panic!("recv should be closed but seems to be happy waiting for more data");
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -564,49 +590,87 @@ async fn max_nodes_per_connection_is_enforced() {
|
||||
false,
|
||||
CoreOpts::default(),
|
||||
// Limit max nodes per connection to 2; any other msgs should be ignored.
|
||||
ShardOpts { max_nodes_per_connection: Some(2) }
|
||||
).await;
|
||||
ShardOpts {
|
||||
max_nodes_per_connection: Some(2),
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
// Connect to a shard
|
||||
let shard_id = server.add_shard().await.unwrap();
|
||||
let (mut node_tx, _node_rx) = server.get_shard(shard_id).unwrap().connect_node().await.unwrap();
|
||||
let (mut node_tx, _node_rx) = server
|
||||
.get_shard(shard_id)
|
||||
.unwrap()
|
||||
.connect_node()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Connect a feed.
|
||||
let (mut feed_tx, mut feed_rx) = server.get_core().connect_feed().await.unwrap();
|
||||
|
||||
// We'll send these messages from the node:
|
||||
let json_msg = |n| json!({
|
||||
"id":n,
|
||||
"ts":"2021-07-12T10:37:47.714666+01:00",
|
||||
"payload": {
|
||||
"authority":true,
|
||||
"chain":"Test Chain",
|
||||
"config":"",
|
||||
"genesis_hash": BlockHash::from_low_u64_ne(1),
|
||||
"implementation":"Polkadot",
|
||||
"msg":"system.connected",
|
||||
"name": format!("Alice {}", n),
|
||||
"network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp",
|
||||
"startup_time":"1625565542717",
|
||||
"version":"2.0.0-07a1af348-aarch64-macos"
|
||||
}
|
||||
});
|
||||
let json_msg = |n| {
|
||||
json!({
|
||||
"id":n,
|
||||
"ts":"2021-07-12T10:37:47.714666+01:00",
|
||||
"payload": {
|
||||
"authority":true,
|
||||
"chain":"Test Chain",
|
||||
"config":"",
|
||||
"genesis_hash": BlockHash::from_low_u64_ne(1),
|
||||
"implementation":"Polkadot",
|
||||
"msg":"system.connected",
|
||||
"name": format!("Alice {}", n),
|
||||
"network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp",
|
||||
"startup_time":"1625565542717",
|
||||
"version":"2.0.0-07a1af348-aarch64-macos"
|
||||
}
|
||||
})
|
||||
};
|
||||
|
||||
// First message ID should lead to feed messages:
|
||||
node_tx.send_json_text(json_msg(1)).unwrap();
|
||||
assert_ne!(feed_rx.recv_feed_messages_timeout(Duration::from_secs(1)).await.unwrap().len(), 0);
|
||||
assert_ne!(
|
||||
feed_rx
|
||||
.recv_feed_messages_timeout(Duration::from_secs(1))
|
||||
.await
|
||||
.unwrap()
|
||||
.len(),
|
||||
0
|
||||
);
|
||||
|
||||
// Second message ID should lead to feed messages as well:
|
||||
node_tx.send_json_text(json_msg(2)).unwrap();
|
||||
assert_ne!(feed_rx.recv_feed_messages_timeout(Duration::from_secs(1)).await.unwrap().len(), 0);
|
||||
assert_ne!(
|
||||
feed_rx
|
||||
.recv_feed_messages_timeout(Duration::from_secs(1))
|
||||
.await
|
||||
.unwrap()
|
||||
.len(),
|
||||
0
|
||||
);
|
||||
|
||||
// Third message ID should be ignored:
|
||||
node_tx.send_json_text(json_msg(3)).unwrap();
|
||||
assert_eq!(feed_rx.recv_feed_messages_timeout(Duration::from_secs(1)).await.unwrap().len(), 0);
|
||||
assert_eq!(
|
||||
feed_rx
|
||||
.recv_feed_messages_timeout(Duration::from_secs(1))
|
||||
.await
|
||||
.unwrap()
|
||||
.len(),
|
||||
0
|
||||
);
|
||||
|
||||
// Forth message ID should be ignored as well:
|
||||
node_tx.send_json_text(json_msg(4)).unwrap();
|
||||
assert_eq!(feed_rx.recv_feed_messages_timeout(Duration::from_secs(1)).await.unwrap().len(), 0);
|
||||
assert_eq!(
|
||||
feed_rx
|
||||
.recv_feed_messages_timeout(Duration::from_secs(1))
|
||||
.await
|
||||
.unwrap()
|
||||
.len(),
|
||||
0
|
||||
);
|
||||
|
||||
// (now that the chain "Test Chain" is known about, subscribe to it for update messages.
|
||||
// This wasn't needed to receive messages re the above since everybody hears about node
|
||||
@@ -619,25 +683,53 @@ async fn max_nodes_per_connection_is_enforced() {
|
||||
node_tx.send_json_text(json!(
|
||||
{"id":1, "payload":{ "bandwidth_download":576,"bandwidth_upload":576,"msg":"system.interval","peers":1},"ts":"2021-07-12T10:38:48.330433+01:00" }
|
||||
)).unwrap();
|
||||
assert_ne!(feed_rx.recv_feed_messages_timeout(Duration::from_secs(1)).await.unwrap().len(), 0);
|
||||
assert_ne!(
|
||||
feed_rx
|
||||
.recv_feed_messages_timeout(Duration::from_secs(1))
|
||||
.await
|
||||
.unwrap()
|
||||
.len(),
|
||||
0
|
||||
);
|
||||
|
||||
node_tx.send_json_text(json!(
|
||||
{"id":2, "payload":{ "bandwidth_download":576,"bandwidth_upload":576,"msg":"system.interval","peers":1},"ts":"2021-07-12T10:38:48.330433+01:00" }
|
||||
)).unwrap();
|
||||
assert_ne!(feed_rx.recv_feed_messages_timeout(Duration::from_secs(1)).await.unwrap().len(), 0);
|
||||
assert_ne!(
|
||||
feed_rx
|
||||
.recv_feed_messages_timeout(Duration::from_secs(1))
|
||||
.await
|
||||
.unwrap()
|
||||
.len(),
|
||||
0
|
||||
);
|
||||
|
||||
// Updates about ignored IDs are still ignored:
|
||||
|
||||
node_tx.send_json_text(json!(
|
||||
{"id":3, "payload":{ "bandwidth_download":576,"bandwidth_upload":576,"msg":"system.interval","peers":1},"ts":"2021-07-12T10:38:48.330433+01:00" }
|
||||
)).unwrap();
|
||||
assert_eq!(feed_rx.recv_feed_messages_timeout(Duration::from_secs(1)).await.unwrap().len(), 0);
|
||||
assert_eq!(
|
||||
feed_rx
|
||||
.recv_feed_messages_timeout(Duration::from_secs(1))
|
||||
.await
|
||||
.unwrap()
|
||||
.len(),
|
||||
0
|
||||
);
|
||||
|
||||
node_tx.send_json_text(json!(
|
||||
{"id":4, "payload":{ "bandwidth_download":576,"bandwidth_upload":576,"msg":"system.interval","peers":1},"ts":"2021-07-12T10:38:48.330433+01:00" }
|
||||
)).unwrap();
|
||||
assert_eq!(feed_rx.recv_feed_messages_timeout(Duration::from_secs(1)).await.unwrap().len(), 0);
|
||||
assert_eq!(
|
||||
feed_rx
|
||||
.recv_feed_messages_timeout(Duration::from_secs(1))
|
||||
.await
|
||||
.unwrap()
|
||||
.len(),
|
||||
0
|
||||
);
|
||||
|
||||
// Tidy up:
|
||||
server.shutdown().await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,15 +18,15 @@ In general, if you run into issues, it may be better to run this on a linux
|
||||
box; MacOS seems to hit limits quicker in general.
|
||||
*/
|
||||
|
||||
use futures::{ StreamExt };
|
||||
use common::node_types::BlockHash;
|
||||
use common::ws_client::SentMessage;
|
||||
use futures::StreamExt;
|
||||
use serde_json::json;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use structopt::StructOpt;
|
||||
use test_utils::workspace::start_server_release;
|
||||
use common::ws_client::{ SentMessage };
|
||||
use serde_json::json;
|
||||
use std::time::Duration;
|
||||
use std::sync::atomic::{ Ordering, AtomicUsize };
|
||||
use std::sync::Arc;
|
||||
use common::node_types::BlockHash;
|
||||
|
||||
/// A configurable soak_test runner. Configure by providing the expected args as
|
||||
/// an environment variable. One example to run this test is:
|
||||
@@ -78,22 +78,24 @@ async fn run_soak_test(opts: SoakTestOpts) {
|
||||
|
||||
// Each node tells the shard about itself:
|
||||
for (idx, (node_tx, _)) in nodes.iter_mut().enumerate() {
|
||||
node_tx.send_json_binary(json!({
|
||||
"id":1, // Only needs to be unique per node
|
||||
"ts":"2021-07-12T10:37:47.714666+01:00",
|
||||
"payload": {
|
||||
"authority":true,
|
||||
"chain": "Polkadot", // <- so that we don't go over quota with lots of nodes.
|
||||
"config":"",
|
||||
"genesis_hash": BlockHash::from_low_u64_ne(1),
|
||||
"implementation":"Substrate Node",
|
||||
"msg":"system.connected",
|
||||
"name": format!("Node #{}", idx),
|
||||
"network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp",
|
||||
"startup_time":"1625565542717",
|
||||
"version":"2.0.0-07a1af348-aarch64-macos"
|
||||
},
|
||||
})).unwrap();
|
||||
node_tx
|
||||
.send_json_binary(json!({
|
||||
"id":1, // Only needs to be unique per node
|
||||
"ts":"2021-07-12T10:37:47.714666+01:00",
|
||||
"payload": {
|
||||
"authority":true,
|
||||
"chain": "Polkadot", // <- so that we don't go over quota with lots of nodes.
|
||||
"config":"",
|
||||
"genesis_hash": BlockHash::from_low_u64_ne(1),
|
||||
"implementation":"Substrate Node",
|
||||
"msg":"system.connected",
|
||||
"name": format!("Node #{}", idx),
|
||||
"network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp",
|
||||
"startup_time":"1625565542717",
|
||||
"version":"2.0.0-07a1af348-aarch64-macos"
|
||||
},
|
||||
}))
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
// Connect feeds to the core:
|
||||
@@ -127,12 +129,15 @@ async fn run_soak_test(opts: SoakTestOpts) {
|
||||
loop {
|
||||
// every ~1second we aim to have sent messages from all of the nodes. So we cycle through
|
||||
// the node IDs and send a message from each at roughly 1s / number_of_nodes.
|
||||
let mut interval = tokio::time::interval(Duration::from_secs_f64(1.0 / nodes.len() as f64));
|
||||
let mut interval =
|
||||
tokio::time::interval(Duration::from_secs_f64(1.0 / nodes.len() as f64));
|
||||
|
||||
for node_id in (0..nodes.len()).cycle() {
|
||||
interval.tick().await;
|
||||
let node_tx = &mut nodes[node_id].0;
|
||||
node_tx.unbounded_send(SentMessage::StaticBinary(msg_bytes)).unwrap();
|
||||
node_tx
|
||||
.unbounded_send(SentMessage::StaticBinary(msg_bytes))
|
||||
.unwrap();
|
||||
bytes_in2.fetch_add(msg_bytes.len(), Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
@@ -162,7 +167,8 @@ async fn run_soak_test(opts: SoakTestOpts) {
|
||||
let bytes_in_val = bytes_in.load(Ordering::Relaxed);
|
||||
let bytes_out_val = bytes_out.load(Ordering::Relaxed);
|
||||
|
||||
println!("#{}: MB in/out per measurement: {:.4} / {:.4}, total bytes in/out: {} / {})",
|
||||
println!(
|
||||
"#{}: MB in/out per measurement: {:.4} / {:.4}, total bytes in/out: {} / {})",
|
||||
n,
|
||||
(bytes_in_val - last_bytes_in) as f64 / one_mb,
|
||||
(bytes_out_val - last_bytes_out) as f64 / one_mb,
|
||||
@@ -193,19 +199,18 @@ struct SoakTestOpts {
|
||||
feeds: usize,
|
||||
/// The number of nodes to connect to each feed
|
||||
#[structopt(long)]
|
||||
nodes: usize
|
||||
nodes: usize,
|
||||
}
|
||||
|
||||
/// Get soak test args from an envvar and parse them via structopt.
|
||||
fn get_soak_test_opts() -> SoakTestOpts {
|
||||
let arg_string = std::env::var("SOAK_TEST_ARGS")
|
||||
.expect("Expecting args to be provided in the env var SOAK_TEST_ARGS");
|
||||
let args = shellwords::split(&arg_string)
|
||||
.expect("Could not parse SOAK_TEST_ARGS as shell arguments");
|
||||
let args =
|
||||
shellwords::split(&arg_string).expect("Could not parse SOAK_TEST_ARGS as shell arguments");
|
||||
|
||||
// The binary name is expected to be the first arg, so fake it:
|
||||
let all_args = std::iter::once("soak_test".to_owned())
|
||||
.chain(args.into_iter());
|
||||
let all_args = std::iter::once("soak_test".to_owned()).chain(args.into_iter());
|
||||
|
||||
SoakTestOpts::from_iter(all_args)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ use common::{
|
||||
node_types::BlockHash,
|
||||
AssignId,
|
||||
};
|
||||
use futures::{channel::mpsc};
|
||||
use futures::channel::mpsc;
|
||||
use futures::{Sink, SinkExt, StreamExt};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::atomic::AtomicU64;
|
||||
@@ -101,7 +101,7 @@ impl Aggregator {
|
||||
};
|
||||
if let Err(_) = tx_to_aggregator2.send(msg_to_aggregator).await {
|
||||
// This will close the ws channels, which themselves log messages.
|
||||
break
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use bincode::Options;
|
||||
use common::ws_client;
|
||||
use futures::channel::mpsc;
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use common::ws_client;
|
||||
use bincode::Options;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum Message<Out> {
|
||||
@@ -49,7 +49,7 @@ where
|
||||
if let Err(e) = tx_out.send(Message::Connected).await {
|
||||
// If receiving end is closed, bail now.
|
||||
log::warn!("Aggregator is no longer receiving messages from core; disconnecting (permanently): {}", e);
|
||||
return
|
||||
return;
|
||||
}
|
||||
|
||||
// Loop, forwarding messages to and from the core until something goes wrong.
|
||||
@@ -103,7 +103,7 @@ where
|
||||
}
|
||||
};
|
||||
}
|
||||
},
|
||||
}
|
||||
Err(connect_err) => {
|
||||
// Issue connecting? Wait and try again on the next loop iteration.
|
||||
log::error!(
|
||||
@@ -127,4 +127,4 @@ where
|
||||
});
|
||||
|
||||
(tx_in, rx_out)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
#[warn(missing_docs)]
|
||||
|
||||
mod aggregator;
|
||||
mod connection;
|
||||
mod json_message;
|
||||
@@ -8,13 +7,13 @@ mod real_ip;
|
||||
use std::{collections::HashSet, net::IpAddr};
|
||||
|
||||
use aggregator::{Aggregator, FromWebsocket};
|
||||
use common::http_utils;
|
||||
use common::node_message;
|
||||
use futures::{channel::mpsc, SinkExt, StreamExt};
|
||||
use http::Uri;
|
||||
use hyper::{Method, Response};
|
||||
use simple_logger::SimpleLogger;
|
||||
use structopt::StructOpt;
|
||||
use hyper::{ Response, Method };
|
||||
use common::http_utils;
|
||||
|
||||
const VERSION: &str = env!("CARGO_PKG_VERSION");
|
||||
const AUTHORS: &str = env!("CARGO_PKG_AUTHORS");
|
||||
@@ -47,7 +46,7 @@ struct Opts {
|
||||
/// This is important because without a limit, a single connection could exhaust
|
||||
/// RAM by suggesting that it accounts for billions of nodes.
|
||||
#[structopt(long, default_value = "20")]
|
||||
max_nodes_per_connection: usize
|
||||
max_nodes_per_connection: usize,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
@@ -77,29 +76,35 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> {
|
||||
async move {
|
||||
match (req.method(), req.uri().path().trim_end_matches('/')) {
|
||||
// Check that the server is up and running:
|
||||
(&Method::GET, "/health") => {
|
||||
Ok(Response::new("OK".into()))
|
||||
},
|
||||
(&Method::GET, "/health") => Ok(Response::new("OK".into())),
|
||||
// Nodes send messages here:
|
||||
(&Method::GET, "/submit") => {
|
||||
let real_addr = real_ip::real_ip(addr, req.headers());
|
||||
Ok(http_utils::upgrade_to_websocket(req, move |ws_send, ws_recv| async move {
|
||||
let tx_to_aggregator = aggregator.subscribe_node();
|
||||
let (mut tx_to_aggregator, mut ws_send)
|
||||
= handle_node_websocket_connection(real_addr, ws_send, ws_recv, tx_to_aggregator, max_nodes_per_connection).await;
|
||||
log::info!("Closing /submit connection from {:?}", addr);
|
||||
// Tell the aggregator that this connection has closed, so it can tidy up.
|
||||
let _ = tx_to_aggregator.send(FromWebsocket::Disconnected).await;
|
||||
let _ = ws_send.close().await;
|
||||
}))
|
||||
},
|
||||
// 404 for anything else:
|
||||
_ => {
|
||||
Ok(Response::builder()
|
||||
.status(404)
|
||||
.body("Not found".into())
|
||||
.unwrap())
|
||||
Ok(http_utils::upgrade_to_websocket(
|
||||
req,
|
||||
move |ws_send, ws_recv| async move {
|
||||
let tx_to_aggregator = aggregator.subscribe_node();
|
||||
let (mut tx_to_aggregator, mut ws_send) =
|
||||
handle_node_websocket_connection(
|
||||
real_addr,
|
||||
ws_send,
|
||||
ws_recv,
|
||||
tx_to_aggregator,
|
||||
max_nodes_per_connection,
|
||||
)
|
||||
.await;
|
||||
log::info!("Closing /submit connection from {:?}", addr);
|
||||
// Tell the aggregator that this connection has closed, so it can tidy up.
|
||||
let _ = tx_to_aggregator.send(FromWebsocket::Disconnected).await;
|
||||
let _ = ws_send.close().await;
|
||||
},
|
||||
))
|
||||
}
|
||||
// 404 for anything else:
|
||||
_ => Ok(Response::builder()
|
||||
.status(404)
|
||||
.body("Not found".into())
|
||||
.unwrap()),
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -114,7 +119,7 @@ async fn handle_node_websocket_connection<S>(
|
||||
ws_send: http_utils::WsSender,
|
||||
mut ws_recv: http_utils::WsReceiver,
|
||||
mut tx_to_aggregator: S,
|
||||
max_nodes_per_connection: usize
|
||||
max_nodes_per_connection: usize,
|
||||
) -> (S, http_utils::WsSender)
|
||||
where
|
||||
S: futures::Sink<FromWebsocket, Error = anyhow::Error> + Unpin + Send + 'static,
|
||||
|
||||
@@ -11,4 +11,4 @@ pub mod feed_message_de;
|
||||
pub mod contains_matches;
|
||||
|
||||
/// Utilities to help with running tests from within this current workspace.
|
||||
pub mod workspace;
|
||||
pub mod workspace;
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
use std::{ops::{Deref, DerefMut}, time::Duration};
|
||||
use std::{
|
||||
ops::{Deref, DerefMut},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use crate::feed_message_de::FeedMessage;
|
||||
use common::ws_client;
|
||||
@@ -51,10 +54,7 @@ impl ShardSender {
|
||||
self.unbounded_send(ws_client::SentMessage::Binary(bytes))
|
||||
}
|
||||
/// Send JSON as a textual websocket message
|
||||
pub fn send_json_text(
|
||||
&mut self,
|
||||
json: serde_json::Value,
|
||||
) -> Result<(), ws_client::SendError> {
|
||||
pub fn send_json_text(&mut self, json: serde_json::Value) -> Result<(), ws_client::SendError> {
|
||||
let s = serde_json::to_string(&json).expect("valid string");
|
||||
self.unbounded_send(ws_client::SentMessage::Text(s))
|
||||
}
|
||||
@@ -169,7 +169,6 @@ impl FeedSender {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// Wrap a `ws_client::Receiver` with convenient utility methods for feed connections
|
||||
pub struct FeedReceiver(ws_client::Receiver);
|
||||
|
||||
@@ -208,23 +207,26 @@ impl FeedReceiver {
|
||||
/// Prefer [`FeedReceiver::recv_feed_messages`]; tests should generally be
|
||||
/// robust in assuming that messages may not all be delivered at once (unless we are
|
||||
/// specifically testing which messages are buffered together).
|
||||
pub async fn recv_feed_messages_once_timeout(&mut self, timeout: Duration) -> Result<Vec<FeedMessage>, anyhow::Error> {
|
||||
pub async fn recv_feed_messages_once_timeout(
|
||||
&mut self,
|
||||
timeout: Duration,
|
||||
) -> Result<Vec<FeedMessage>, anyhow::Error> {
|
||||
let msg = match tokio::time::timeout(timeout, self.0.next()).await {
|
||||
// Timeout elapsed; no messages back:
|
||||
Err(_) => return Ok(Vec::new()),
|
||||
// Something back; Complain if error no stream closed:
|
||||
Ok(res) => res.ok_or_else(|| anyhow::anyhow!("Stream closed: no more messages"))??
|
||||
Ok(res) => res.ok_or_else(|| anyhow::anyhow!("Stream closed: no more messages"))??,
|
||||
};
|
||||
|
||||
match msg {
|
||||
ws_client::RecvMessage::Binary(data) => {
|
||||
let messages = FeedMessage::from_bytes(&data)?;
|
||||
Ok(messages)
|
||||
},
|
||||
}
|
||||
ws_client::RecvMessage::Text(text) => {
|
||||
let messages = FeedMessage::from_bytes(text.as_bytes())?;
|
||||
Ok(messages)
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -232,7 +234,8 @@ impl FeedReceiver {
|
||||
/// See `recv_feed_messages_once_timeout`.
|
||||
pub async fn recv_feed_messages_once(&mut self) -> Result<Vec<FeedMessage>, anyhow::Error> {
|
||||
// Default to a timeout of 30 seconds, meaning that the test will eventually end,
|
||||
self.recv_feed_messages_once_timeout(Duration::from_secs(30)).await
|
||||
self.recv_feed_messages_once_timeout(Duration::from_secs(30))
|
||||
.await
|
||||
}
|
||||
|
||||
/// Wait for feed messages to be sent back, building up a list of output messages until
|
||||
@@ -241,12 +244,16 @@ impl FeedReceiver {
|
||||
/// If no new messages are received within the timeout given, bail with whatever we have so far.
|
||||
/// This differs from `recv_feed_messages` and `recv_feed_messages_once`, which will block indefinitely
|
||||
/// waiting for something to arrive
|
||||
pub async fn recv_feed_messages_timeout(&mut self, timeout: Duration) -> Result<Vec<FeedMessage>, anyhow::Error> {
|
||||
pub async fn recv_feed_messages_timeout(
|
||||
&mut self,
|
||||
timeout: Duration,
|
||||
) -> Result<Vec<FeedMessage>, anyhow::Error> {
|
||||
// Block as long as needed for messages to start coming in:
|
||||
let mut feed_messages = match tokio::time::timeout(timeout, self.recv_feed_messages_once()).await {
|
||||
Ok(msgs) => msgs?,
|
||||
Err(_) => return Ok(Vec::new()),
|
||||
};
|
||||
let mut feed_messages =
|
||||
match tokio::time::timeout(timeout, self.recv_feed_messages_once()).await {
|
||||
Ok(msgs) => msgs?,
|
||||
Err(_) => return Ok(Vec::new()),
|
||||
};
|
||||
|
||||
// Then, loop a little to make sure we catch any additional messages that are sent soon after:
|
||||
loop {
|
||||
@@ -271,6 +278,7 @@ impl FeedReceiver {
|
||||
/// See `recv_feed_messages_timeout`.
|
||||
pub async fn recv_feed_messages(&mut self) -> Result<Vec<FeedMessage>, anyhow::Error> {
|
||||
// Default to a timeout of 30 seconds, meaning that the test will eventually end,
|
||||
self.recv_feed_messages_timeout(Duration::from_secs(30)).await
|
||||
self.recv_feed_messages_timeout(Duration::from_secs(30))
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -36,7 +36,7 @@ pub enum StartOpts {
|
||||
/// Where is the process that we can subscribe to the `/feed` of?
|
||||
/// Eg: `127.0.0.1:3000`
|
||||
feed_host: String,
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
/// This represents a telemetry server. It can be in different modes
|
||||
@@ -47,7 +47,7 @@ pub enum Server {
|
||||
/// A virtual shard that we can hand out.
|
||||
virtual_shard: ShardProcess,
|
||||
/// Core process that we can connect to.
|
||||
core: CoreProcess
|
||||
core: CoreProcess,
|
||||
},
|
||||
ShardAndCoreMode {
|
||||
/// Command to run to start a new shard.
|
||||
@@ -67,10 +67,9 @@ pub enum Server {
|
||||
shards: DenseMap<ProcessId, ShardProcess>,
|
||||
/// Core process that we can connect to.
|
||||
core: CoreProcess,
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum Error {
|
||||
#[error("Can't establsih connection: {0}")]
|
||||
@@ -88,23 +87,23 @@ pub enum Error {
|
||||
)]
|
||||
CannotAddShard,
|
||||
#[error("The URI provided was invalid: {0}")]
|
||||
InvalidUri(#[from] http::uri::InvalidUri)
|
||||
InvalidUri(#[from] http::uri::InvalidUri),
|
||||
}
|
||||
|
||||
impl Server {
|
||||
pub fn get_core(&self) -> &CoreProcess {
|
||||
match self {
|
||||
Server::SingleProcessMode { core, .. } => core,
|
||||
Server::ShardAndCoreMode { core, ..} => core,
|
||||
Server::ConnectToExistingMode { core, .. } => core
|
||||
Server::ShardAndCoreMode { core, .. } => core,
|
||||
Server::ConnectToExistingMode { core, .. } => core,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_shard(&self, id: ProcessId) -> Option<&ShardProcess> {
|
||||
match self {
|
||||
Server::SingleProcessMode { virtual_shard, .. } => Some(virtual_shard),
|
||||
Server::ShardAndCoreMode { shards, ..} => shards.get(id),
|
||||
Server::ConnectToExistingMode { shards, .. } => shards.get(id)
|
||||
Server::ShardAndCoreMode { shards, .. } => shards.get(id),
|
||||
Server::ConnectToExistingMode { shards, .. } => shards.get(id),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -112,8 +111,8 @@ impl Server {
|
||||
let shard = match self {
|
||||
// Can't remove the pretend shard:
|
||||
Server::SingleProcessMode { .. } => return false,
|
||||
Server::ShardAndCoreMode { shards, ..} => shards.remove(id),
|
||||
Server::ConnectToExistingMode { shards, .. } => shards.remove(id)
|
||||
Server::ShardAndCoreMode { shards, .. } => shards.remove(id),
|
||||
Server::ConnectToExistingMode { shards, .. } => shards.remove(id),
|
||||
};
|
||||
|
||||
let shard = match shard {
|
||||
@@ -137,12 +136,9 @@ impl Server {
|
||||
// Run all kill futs simultaneously.
|
||||
let handle = tokio::spawn(async move {
|
||||
let (core, shards) = match self {
|
||||
Server::SingleProcessMode { core, .. }
|
||||
=> (core, DenseMap::new()),
|
||||
Server::ShardAndCoreMode { core, shards, ..}
|
||||
=> (core, shards),
|
||||
Server::ConnectToExistingMode { core, shards, .. }
|
||||
=> (core, shards)
|
||||
Server::SingleProcessMode { core, .. } => (core, DenseMap::new()),
|
||||
Server::ShardAndCoreMode { core, shards, .. } => (core, shards),
|
||||
Server::ConnectToExistingMode { core, shards, .. } => (core, shards),
|
||||
};
|
||||
|
||||
let shard_kill_futs = shards.into_iter().map(|(_, s)| s.kill());
|
||||
@@ -157,15 +153,18 @@ impl Server {
|
||||
pub async fn add_shard(&mut self) -> Result<ProcessId, Error> {
|
||||
match self {
|
||||
// Always get back the same "virtual" shard; we're always just talking to the core anyway.
|
||||
Server::SingleProcessMode { virtual_shard, .. } => {
|
||||
Ok(virtual_shard.id)
|
||||
},
|
||||
Server::SingleProcessMode { virtual_shard, .. } => Ok(virtual_shard.id),
|
||||
// We're connecting to an existing process. Find the next host we've been told about
|
||||
// round-robin style and use that as our new virtual shard.
|
||||
Server::ConnectToExistingMode { submit_hosts, next_submit_host_idx, shards, .. } => {
|
||||
Server::ConnectToExistingMode {
|
||||
submit_hosts,
|
||||
next_submit_host_idx,
|
||||
shards,
|
||||
..
|
||||
} => {
|
||||
let host = match submit_hosts.get(*next_submit_host_idx % submit_hosts.len()) {
|
||||
Some(host) => host,
|
||||
None => return Err(Error::CannotAddShard)
|
||||
None => return Err(Error::CannotAddShard),
|
||||
};
|
||||
*next_submit_host_idx += 1;
|
||||
|
||||
@@ -177,9 +176,13 @@ impl Server {
|
||||
});
|
||||
|
||||
Ok(pid)
|
||||
},
|
||||
}
|
||||
// Start a new process and return that.
|
||||
Server::ShardAndCoreMode { shard_command, shards, core } => {
|
||||
Server::ShardAndCoreMode {
|
||||
shard_command,
|
||||
shards,
|
||||
core,
|
||||
} => {
|
||||
// Where is the URI we'll want to submit things to?
|
||||
let core_shard_submit_uri = format!("http://{}/shard_submit", core.host);
|
||||
|
||||
@@ -224,7 +227,7 @@ impl Server {
|
||||
});
|
||||
|
||||
Ok(pid)
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -240,31 +243,35 @@ impl Server {
|
||||
id: ProcessId(0),
|
||||
host: virtual_shard_host,
|
||||
handle: None,
|
||||
_channel_type: PhantomData
|
||||
}
|
||||
}
|
||||
},
|
||||
StartOpts::ShardAndCore { core_command, shard_command } => {
|
||||
let core_process = Server::start_core(core_command).await?;
|
||||
Server::ShardAndCoreMode {
|
||||
core: core_process,
|
||||
shard_command,
|
||||
shards: DenseMap::new()
|
||||
}
|
||||
},
|
||||
StartOpts::ConnectToExisting { feed_host, submit_hosts } => {
|
||||
Server::ConnectToExistingMode {
|
||||
submit_hosts,
|
||||
next_submit_host_idx: 0,
|
||||
shards: DenseMap::new(),
|
||||
core: Process {
|
||||
id: ProcessId(0),
|
||||
host: feed_host,
|
||||
handle: None,
|
||||
_channel_type: PhantomData,
|
||||
},
|
||||
}
|
||||
}
|
||||
StartOpts::ShardAndCore {
|
||||
core_command,
|
||||
shard_command,
|
||||
} => {
|
||||
let core_process = Server::start_core(core_command).await?;
|
||||
Server::ShardAndCoreMode {
|
||||
core: core_process,
|
||||
shard_command,
|
||||
shards: DenseMap::new(),
|
||||
}
|
||||
}
|
||||
StartOpts::ConnectToExisting {
|
||||
feed_host,
|
||||
submit_hosts,
|
||||
} => Server::ConnectToExistingMode {
|
||||
submit_hosts,
|
||||
next_submit_host_idx: 0,
|
||||
shards: DenseMap::new(),
|
||||
core: Process {
|
||||
id: ProcessId(0),
|
||||
host: feed_host,
|
||||
handle: None,
|
||||
_channel_type: PhantomData,
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
Ok(server)
|
||||
@@ -340,7 +347,9 @@ impl<Channel> Process<Channel> {
|
||||
}
|
||||
|
||||
/// Establish a raw WebSocket connection (not cancel-safe)
|
||||
async fn connect_to_uri_raw(uri: &http::Uri) -> Result<(ws_client::RawSender, ws_client::RawReceiver), Error> {
|
||||
async fn connect_to_uri_raw(
|
||||
uri: &http::Uri,
|
||||
) -> Result<(ws_client::RawSender, ws_client::RawReceiver), Error> {
|
||||
ws_client::connect(uri)
|
||||
.await
|
||||
.map(|c| c.into_raw())
|
||||
@@ -371,19 +380,26 @@ impl<Send: From<ws_client::Sender>, Recv: From<ws_client::Receiver>> Process<(Se
|
||||
|
||||
impl ShardProcess {
|
||||
/// Establish a raw connection to the process
|
||||
pub async fn connect_node_raw(&self) -> Result<(ws_client::RawSender, ws_client::RawReceiver), Error> {
|
||||
pub async fn connect_node_raw(
|
||||
&self,
|
||||
) -> Result<(ws_client::RawSender, ws_client::RawReceiver), Error> {
|
||||
let uri = format!("http://{}/submit", self.host).parse()?;
|
||||
connect_to_uri_raw(&uri).await
|
||||
}
|
||||
|
||||
/// Establish a connection to the process
|
||||
pub async fn connect_node(&self) -> Result<(channels::ShardSender, channels::ShardReceiver), Error> {
|
||||
pub async fn connect_node(
|
||||
&self,
|
||||
) -> Result<(channels::ShardSender, channels::ShardReceiver), Error> {
|
||||
let uri = format!("http://{}/submit", self.host).parse()?;
|
||||
Process::connect_to_uri(&uri).await
|
||||
}
|
||||
|
||||
/// Establish multiple connections to the process
|
||||
pub async fn connect_multiple_nodes(&self, num_connections: usize) -> Result<Vec<(channels::ShardSender, channels::ShardReceiver)>, Error> {
|
||||
pub async fn connect_multiple_nodes(
|
||||
&self,
|
||||
num_connections: usize,
|
||||
) -> Result<Vec<(channels::ShardSender, channels::ShardReceiver)>, Error> {
|
||||
let uri = format!("http://{}/submit", self.host).parse()?;
|
||||
Process::connect_multiple_to_uri(&uri, num_connections).await
|
||||
}
|
||||
@@ -391,19 +407,26 @@ impl ShardProcess {
|
||||
|
||||
impl CoreProcess {
|
||||
/// Establish a raw connection to the process
|
||||
pub async fn connect_feed_raw(&self) -> Result<(ws_client::RawSender, ws_client::RawReceiver), Error> {
|
||||
pub async fn connect_feed_raw(
|
||||
&self,
|
||||
) -> Result<(ws_client::RawSender, ws_client::RawReceiver), Error> {
|
||||
let uri = format!("http://{}/feed", self.host).parse()?;
|
||||
connect_to_uri_raw(&uri).await
|
||||
}
|
||||
|
||||
/// Establish a connection to the process
|
||||
pub async fn connect_feed(&self) -> Result<(channels::FeedSender, channels::FeedReceiver), Error> {
|
||||
pub async fn connect_feed(
|
||||
&self,
|
||||
) -> Result<(channels::FeedSender, channels::FeedReceiver), Error> {
|
||||
let uri = format!("http://{}/feed", self.host).parse()?;
|
||||
Process::connect_to_uri(&uri).await
|
||||
}
|
||||
|
||||
/// Establish multiple connections to the process
|
||||
pub async fn connect_multiple_feeds(&self, num_connections: usize) -> Result<Vec<(channels::FeedSender, channels::FeedReceiver)>, Error> {
|
||||
pub async fn connect_multiple_feeds(
|
||||
&self,
|
||||
num_connections: usize,
|
||||
) -> Result<Vec<(channels::FeedSender, channels::FeedReceiver)>, Error> {
|
||||
let uri = format!("http://{}/feed", self.host).parse()?;
|
||||
Process::connect_multiple_to_uri(&uri, num_connections).await
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use common::ws_client;
|
||||
use anyhow::{anyhow, Context};
|
||||
use common::ws_client;
|
||||
use tokio::io::BufReader;
|
||||
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite};
|
||||
use tokio::time::Duration;
|
||||
@@ -45,13 +45,9 @@ pub async fn wait_for_line_containing<R: AsyncRead + Unpin, F: Fn(&str) -> bool>
|
||||
|
||||
let line = match line {
|
||||
// timeout expired; couldn't get port:
|
||||
Err(_) => {
|
||||
return Err(anyhow!("Timeout elapsed waiting for text match"))
|
||||
}
|
||||
Err(_) => return Err(anyhow!("Timeout elapsed waiting for text match")),
|
||||
// Something went wrong reading line; bail:
|
||||
Ok(Err(e)) => {
|
||||
return Err(anyhow!("Could not read line from stdout: {}", e))
|
||||
},
|
||||
Ok(Err(e)) => return Err(anyhow!("Could not read line from stdout: {}", e)),
|
||||
// No more output; process ended? bail:
|
||||
Ok(Ok(None)) => {
|
||||
return Err(anyhow!(
|
||||
|
||||
@@ -28,7 +28,8 @@ fn telemetry_command(bin: &'static str, release_mode: bool) -> Result<Command, s
|
||||
cmd = cmd.arg("--release");
|
||||
}
|
||||
|
||||
cmd = cmd.arg("--bin")
|
||||
cmd = cmd
|
||||
.arg("--bin")
|
||||
.arg(bin)
|
||||
.arg("--manifest-path")
|
||||
.arg(workspace_dir)
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
mod commands;
|
||||
mod start_server;
|
||||
|
||||
pub use start_server::*;
|
||||
pub use start_server::*;
|
||||
|
||||
@@ -1,28 +1,26 @@
|
||||
use super::commands;
|
||||
use crate::server::{self, Server, Command};
|
||||
use crate::server::{self, Command, Server};
|
||||
|
||||
/// Additional options to pass to the core command.
|
||||
pub struct CoreOpts {
|
||||
pub feed_timeout: Option<u64>
|
||||
pub feed_timeout: Option<u64>,
|
||||
}
|
||||
|
||||
impl Default for CoreOpts {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
feed_timeout: None
|
||||
}
|
||||
Self { feed_timeout: None }
|
||||
}
|
||||
}
|
||||
|
||||
/// Additional options to pass to the shard command.
|
||||
pub struct ShardOpts {
|
||||
pub max_nodes_per_connection: Option<usize>
|
||||
pub max_nodes_per_connection: Option<usize>,
|
||||
}
|
||||
|
||||
impl Default for ShardOpts {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
max_nodes_per_connection: None
|
||||
max_nodes_per_connection: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -44,12 +42,18 @@ impl Default for ShardOpts {
|
||||
/// - `TELEMETRY_SUBMIT_HOSTS` - hosts (comma separated) to connect to for telemetry `/submit`s.
|
||||
/// - `TELEMETRY_FEED_HOST` - host to connect to for feeds (eg 127.0.0.1:3000)
|
||||
///
|
||||
pub async fn start_server(release_mode: bool, core_opts: CoreOpts, shard_opts: ShardOpts) -> Server {
|
||||
pub async fn start_server(
|
||||
release_mode: bool,
|
||||
core_opts: CoreOpts,
|
||||
shard_opts: ShardOpts,
|
||||
) -> Server {
|
||||
// Start to a single process:
|
||||
if let Ok(bin) = std::env::var("TELEMETRY_BIN") {
|
||||
return Server::start(server::StartOpts::SingleProcess {
|
||||
command: Command::new(bin)
|
||||
}).await.unwrap();
|
||||
command: Command::new(bin),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
// Connect to a running instance:
|
||||
@@ -61,13 +65,18 @@ pub async fn start_server(release_mode: bool, core_opts: CoreOpts, shard_opts: S
|
||||
return Server::start(server::StartOpts::ConnectToExisting {
|
||||
feed_host,
|
||||
submit_hosts,
|
||||
}).await.unwrap();
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
// Build the shard command
|
||||
let mut shard_command = std::env::var("TELEMETRY_SHARD_BIN")
|
||||
.map(|val| Command::new(val))
|
||||
.unwrap_or_else(|_| commands::cargo_run_telemetry_shard(release_mode).expect("must be in rust workspace to run shard command"));
|
||||
.unwrap_or_else(|_| {
|
||||
commands::cargo_run_telemetry_shard(release_mode)
|
||||
.expect("must be in rust workspace to run shard command")
|
||||
});
|
||||
|
||||
// Append additional opts to the shard command
|
||||
if let Some(max_nodes_per_connection) = shard_opts.max_nodes_per_connection {
|
||||
@@ -79,7 +88,10 @@ pub async fn start_server(release_mode: bool, core_opts: CoreOpts, shard_opts: S
|
||||
// Build the core command
|
||||
let mut core_command = std::env::var("TELEMETRY_CORE_BIN")
|
||||
.map(|val| Command::new(val))
|
||||
.unwrap_or_else(|_| commands::cargo_run_telemetry_core(release_mode).expect("must be in rust workspace to run core command"));
|
||||
.unwrap_or_else(|_| {
|
||||
commands::cargo_run_telemetry_core(release_mode)
|
||||
.expect("must be in rust workspace to run core command")
|
||||
});
|
||||
|
||||
// Append additional opts to the core command
|
||||
if let Some(feed_timeout) = core_opts.feed_timeout {
|
||||
@@ -91,8 +103,10 @@ pub async fn start_server(release_mode: bool, core_opts: CoreOpts, shard_opts: S
|
||||
// Star the server
|
||||
Server::start(server::StartOpts::ShardAndCore {
|
||||
shard_command,
|
||||
core_command
|
||||
}).await.unwrap()
|
||||
core_command,
|
||||
})
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
/// Start a telemetry core server in debug mode. see [`start_server`] for details.
|
||||
@@ -103,4 +117,4 @@ pub async fn start_server_debug() -> Server {
|
||||
/// Start a telemetry core server in release mode. see [`start_server`] for details.
|
||||
pub async fn start_server_release() -> Server {
|
||||
start_server(true, CoreOpts::default(), ShardOpts::default()).await
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user