Split msg into JSON and internal variant, and other bits

This commit is contained in:
James Wilson
2021-06-16 11:08:33 +01:00
parent c276c2065a
commit 8e25b4fbdf
14 changed files with 506 additions and 147 deletions
+6 -6
View File
@@ -9,11 +9,11 @@ use crate::feed::{self, FeedMessageSerializer};
use crate::node::connector::NodeConnector;
use common::ws::MuteReason;
use common::shard::ShardConnId;
use common::types::{ConnId, NodeDetails};
use common::util::{DenseMap, Hash};
use common::types::{ConnId, NodeDetails, BlockHash};
use common::util::{DenseMap};
pub struct Aggregator {
genesis_hashes: HashMap<Hash, ChainId>,
genesis_hashes: HashMap<BlockHash, ChainId>,
labels: HashMap<Label, ChainId>,
chains: DenseMap<ChainEntry>,
feeds: DenseMap<Addr<FeedConnector>>,
@@ -26,7 +26,7 @@ pub struct ChainEntry {
/// Address to the `Chain` agent
addr: Addr<Chain>,
/// Genesis [`Hash`] of the chain
genesis_hash: Hash,
genesis_hash: BlockHash,
/// String name of the chain
label: Label,
/// Node count
@@ -66,7 +66,7 @@ impl Aggregator {
/// or the address is disconnected (actor dropped), create a new one.
pub fn lazy_chain(
&mut self,
genesis_hash: Hash,
genesis_hash: BlockHash,
label: &str,
ctx: &mut <Self as Actor>::Context,
) -> ChainId {
@@ -125,7 +125,7 @@ pub struct AddNode {
/// Details of the node being added to the aggregator
pub node: NodeDetails,
/// Genesis [`Hash`] of the chain the node is being added to.
pub genesis_hash: Hash,
pub genesis_hash: BlockHash,
/// Source from which this node is being added (Direct | Shard)
pub source: NodeSource,
}
+1 -33
View File
@@ -1,6 +1,5 @@
use std::collections::HashSet;
use std::iter::FromIterator;
use std::net::Ipv4Addr;
use actix::prelude::*;
use actix_http::ws::Codec;
@@ -19,7 +18,6 @@ mod shard;
use aggregator::{Aggregator, GetHealth};
use feed::connector::FeedConnector;
use location::{Locator, LocatorFactory};
use node::connector::NodeConnector;
use shard::connector::ShardConnector;
const VERSION: &str = env!("CARGO_PKG_VERSION");
@@ -74,35 +72,6 @@ impl From<&LogLevel> for log::LevelFilter {
}
}
/// Entry point for connecting nodes
#[get("/submit")]
async fn node_route(
req: HttpRequest,
stream: web::Payload,
aggregator: web::Data<Addr<Aggregator>>,
locator: web::Data<Addr<Locator>>,
) -> Result<HttpResponse, Error> {
let ip = req
.connection_info()
.realip_remote_addr()
.and_then(|mut addr| {
if let Some(port_idx) = addr.find(':') {
addr = &addr[..port_idx];
}
addr.parse::<Ipv4Addr>().ok()
});
let mut res = ws::handshake(&req)?;
let aggregator = aggregator.get_ref().clone();
let locator = locator.get_ref().clone().recipient();
Ok(res.streaming(ws::WebsocketContext::with_codec(
NodeConnector::new(aggregator, locator, ip),
stream,
Codec::new().max_size(10 * 1024 * 1024), // 10mb frame limit
)))
}
#[get("/shard_submit/{chain_hash}")]
async fn shard_route(
req: HttpRequest,
@@ -112,7 +81,7 @@ async fn shard_route(
path: web::Path<Box<str>>,
) -> Result<HttpResponse, Error> {
let hash_str = path.into_inner();
let genesis_hash = hash_str.parse()?;
let genesis_hash = hash_str.parse::<common::json::Hash>()?.into();
println!("Genesis hash {}", genesis_hash);
@@ -180,7 +149,6 @@ async fn main() -> std::io::Result<()> {
.wrap(middleware::NormalizePath::default())
.data(aggregator.clone())
.data(locator.clone())
.service(node_route)
.service(feed_route)
.service(shard_route)
.service(health)
+3 -4
View File
@@ -8,8 +8,7 @@ use crate::location::LocateRequest;
use actix::prelude::*;
use actix_web_actors::ws::{self, CloseReason};
use bincode::Options;
use common::types::NodeId;
use common::util::Hash;
use common::types::{BlockHash, NodeId};
use common::ws::{MultipartHandler, WsMessage};
use common::shard::{ShardMessage, ShardConnId, BackendMessage};
@@ -24,7 +23,7 @@ pub struct ShardConnector {
/// Aggregator actor address
aggregator: Addr<Aggregator>,
/// Genesis hash of the chain this connection will be submitting data for
genesis_hash: Hash,
genesis_hash: BlockHash,
/// Chain address to which this shard connector is delegating messages
chain: Option<Addr<Chain>>,
/// Transient mapping of `ShardConnId` to external IP address.
@@ -57,7 +56,7 @@ impl ShardConnector {
pub fn new(
aggregator: Addr<Aggregator>,
locator: Recipient<LocateRequest>,
genesis_hash: Hash,
genesis_hash: BlockHash,
) -> Self {
Self {
hb: Instant::now(),