Squashed diff from mh-backend-shard

This commit is contained in:
Maciej Hirsz
2021-06-08 12:17:00 +02:00
parent 505e5a387e
commit 8db384bed3
30 changed files with 1688 additions and 457 deletions
+46 -23
View File
@@ -1,14 +1,16 @@
use actix::prelude::*;
use actix_web_actors::ws::{CloseCode, CloseReason};
use ctor::ctor;
use std::collections::{HashMap, HashSet};
use crate::shard::connector::ShardConnector;
use crate::chain::{self, Chain, ChainId, Label};
use crate::feed::connector::{Connected, FeedConnector, FeedId};
use crate::feed::{self, FeedMessageSerializer};
use crate::node::connector::{Mute, NodeConnector};
use crate::types::{ConnId, NodeDetails};
use crate::util::{DenseMap, Hash};
use crate::node::connector::NodeConnector;
use shared::ws::MuteReason;
use shared::shard::ShardConnId;
use shared::types::{ConnId, NodeDetails};
use shared::util::{DenseMap, Hash};
pub struct Aggregator {
genesis_hashes: HashMap<Hash, ChainId>,
@@ -124,10 +126,24 @@ pub struct AddNode {
pub node: NodeDetails,
/// Genesis [`Hash`] of the chain the node is being added to.
pub genesis_hash: Hash,
/// Connection id used by the node connector for multiplexing parachains
pub conn_id: ConnId,
/// Address of the NodeConnector actor
pub node_connector: Addr<NodeConnector>,
/// Source from which this node is being added (Direct | Shard)
pub source: NodeSource,
}
pub enum NodeSource {
Direct {
/// Connection id used by the node connector for multiplexing parachains
conn_id: ConnId,
/// Address of the NodeConnector actor
node_connector: Addr<NodeConnector>,
},
// TODO
Shard {
/// `ShardConnId` that identifies the node connection within a shard.
sid: ShardConnId,
/// Address to the ShardConnector actor
shard_connector: Addr<ShardConnector>,
}
}
/// Message sent from the Chain to the Aggregator when the Chain loses all nodes
@@ -183,25 +199,36 @@ pub struct NodeCount(pub ChainId, pub usize);
#[rtype(result = "usize")]
pub struct GetHealth;
impl NodeSource {
pub fn mute(&self, reason: MuteReason) {
match self {
NodeSource::Direct { node_connector, .. } => {
node_connector.do_send(reason);
},
// TODO
NodeSource::Shard { shard_connector, .. } => {
// shard_connector.do_send(Mute { reason });
},
}
}
}
impl Handler<AddNode> for Aggregator {
type Result = ();
fn handle(&mut self, msg: AddNode, ctx: &mut Self::Context) {
if self.denylist.contains(&*msg.node.chain) {
log::warn!(target: "Aggregator::AddNode", "'{}' is on the denylist.", msg.node.chain);
let AddNode { node_connector, .. } = msg;
let reason = CloseReason {
code: CloseCode::Abnormal,
description: Some("Denied".into()),
};
node_connector.do_send(Mute { reason });
msg.source.mute(MuteReason::Denied);
return;
}
let AddNode {
node,
genesis_hash,
conn_id,
node_connector,
source,
// conn_id,
// node_connector,
} = msg;
log::trace!(target: "Aggregator::AddNode", "New node connected. Chain '{}'", node.chain);
@@ -213,16 +240,12 @@ impl Handler<AddNode> for Aggregator {
if chain.nodes < chain.max_nodes {
chain.addr.do_send(chain::AddNode {
node,
conn_id,
node_connector,
source,
});
} else {
log::warn!(target: "Aggregator::AddNode", "Chain {} is over quota ({})", chain.label, chain.max_nodes);
let reason = CloseReason {
code: CloseCode::Again,
description: Some("Overquota".into()),
};
node_connector.do_send(Mute { reason });
source.mute(MuteReason::Overquota);
}
}
}
+88 -76
View File
@@ -3,16 +3,13 @@ use rustc_hash::FxHashMap;
use std::collections::HashMap;
use std::sync::Arc;
use crate::aggregator::{Aggregator, DropChain, NodeCount, RenameChain};
use crate::aggregator::{Aggregator, DropChain, NodeCount, NodeSource, RenameChain};
use crate::feed::connector::{FeedConnector, FeedId, Subscribed, Unsubscribed};
use crate::feed::{self, FeedMessageSerializer};
use crate::node::{
connector::{Initialize, NodeConnector},
message::Payload,
Node,
};
use crate::types::{Block, BlockNumber, ConnId, NodeDetails, NodeId, NodeLocation, Timestamp};
use crate::util::{now, DenseMap, NumStats};
use crate::node::Node;
use shared::types::{Block, NodeDetails, NodeId, NodeLocation, Timestamp};
use shared::util::{now, DenseMap, NumStats};
use shared::node::Payload;
const STALE_TIMEOUT: u64 = 2 * 60 * 1000; // 2 minutes
@@ -204,10 +201,8 @@ impl Actor for Chain {
pub struct AddNode {
/// Details of the node being added to the aggregator
pub node: NodeDetails,
/// Connection id used by the node connector for multiplexing parachains
pub conn_id: ConnId,
/// Address of the NodeConnector actor to which we send [`Initialize`] or [`Mute`] messages.
pub node_connector: Addr<NodeConnector>,
/// Source from which this node is being added (Direct | Shard)
pub source: NodeSource,
}
/// Message sent from the NodeConnector to the Chain when it receives new telemetry data
@@ -249,14 +244,38 @@ pub struct LocateNode {
pub location: Arc<NodeLocation>,
}
impl NodeSource {
pub fn init(self, nid: NodeId, chain: Addr<Chain>) -> bool {
match self {
NodeSource::Direct { conn_id, node_connector } => {
node_connector
.try_send(crate::node::connector::Initialize {
nid,
conn_id,
chain,
})
.is_ok()
},
NodeSource::Shard { sid, shard_connector } => {
shard_connector
.try_send(crate::shard::connector::Initialize {
nid,
sid,
chain,
})
.is_ok()
}
}
}
}
impl Handler<AddNode> for Chain {
type Result = ();
fn handle(&mut self, msg: AddNode, ctx: &mut Self::Context) {
let AddNode {
node,
conn_id,
node_connector,
source,
} = msg;
log::trace!(target: "Chain::AddNode", "New node connected. Chain '{}', node count goes from {} to {}", node.chain, self.nodes.len(), self.nodes.len() + 1);
self.increment_label_count(&node.chain);
@@ -264,14 +283,7 @@ impl Handler<AddNode> for Chain {
let nid = self.nodes.add(Node::new(node));
let chain = ctx.address();
if node_connector
.try_send(Initialize {
nid,
conn_id,
chain,
})
.is_err()
{
if source.init(nid, chain) {
self.nodes.remove(nid);
} else if let Some(node) = self.nodes.get(nid) {
self.serializer.push(feed::AddedNode(nid, node));
@@ -355,60 +367,60 @@ impl Handler<UpdateNode> for Chain {
self.serializer.push(feed::NodeIOUpdate(nid, io));
}
}
Payload::AfgAuthoritySet(authority) => {
node.set_validator_address(authority.authority_id.clone());
self.broadcast();
return;
}
Payload::AfgFinalized(finalized) => {
if let Ok(finalized_number) = finalized.finalized_number.parse::<BlockNumber>()
{
if let Some(addr) = node.details().validator.clone() {
self.serializer.push(feed::AfgFinalized(
addr,
finalized_number,
finalized.finalized_hash,
));
self.broadcast_finality();
}
}
return;
}
Payload::AfgReceivedPrecommit(precommit) => {
if let Ok(finalized_number) =
precommit.received.target_number.parse::<BlockNumber>()
{
if let Some(addr) = node.details().validator.clone() {
let voter = precommit.received.voter.clone();
self.serializer.push(feed::AfgReceivedPrecommit(
addr,
finalized_number,
precommit.received.target_hash,
voter,
));
self.broadcast_finality();
}
}
return;
}
Payload::AfgReceivedPrevote(prevote) => {
if let Ok(finalized_number) =
prevote.received.target_number.parse::<BlockNumber>()
{
if let Some(addr) = node.details().validator.clone() {
let voter = prevote.received.voter.clone();
self.serializer.push(feed::AfgReceivedPrevote(
addr,
finalized_number,
prevote.received.target_hash,
voter,
));
self.broadcast_finality();
}
}
return;
}
Payload::AfgReceivedCommit(_) => {}
// Payload::AfgAuthoritySet(authority) => {
// node.set_validator_address(authority.authority_id.clone());
// self.broadcast();
// return;
// }
// Payload::AfgFinalized(finalized) => {
// if let Ok(finalized_number) = finalized.finalized_number.parse::<BlockNumber>()
// {
// if let Some(addr) = node.details().validator.clone() {
// self.serializer.push(feed::AfgFinalized(
// addr,
// finalized_number,
// finalized.finalized_hash,
// ));
// self.broadcast_finality();
// }
// }
// return;
// }
// Payload::AfgReceivedPrecommit(precommit) => {
// if let Ok(finalized_number) =
// precommit.received.target_number.parse::<BlockNumber>()
// {
// if let Some(addr) = node.details().validator.clone() {
// let voter = precommit.received.voter.clone();
// self.serializer.push(feed::AfgReceivedPrecommit(
// addr,
// finalized_number,
// precommit.received.target_hash,
// voter,
// ));
// self.broadcast_finality();
// }
// }
// return;
// }
// Payload::AfgReceivedPrevote(prevote) => {
// if let Ok(finalized_number) =
// prevote.received.target_number.parse::<BlockNumber>()
// {
// if let Some(addr) = node.details().validator.clone() {
// let voter = prevote.received.voter.clone();
// self.serializer.push(feed::AfgReceivedPrevote(
// addr,
// finalized_number,
// prevote.received.target_hash,
// voter,
// ));
// self.broadcast_finality();
// }
// }
// return;
// }
// Payload::AfgReceivedCommit(_) => {}
_ => (),
}
+50 -23
View File
@@ -3,20 +3,33 @@ use serde::Serialize;
use std::mem;
use crate::node::Node;
use crate::types::{
Address, BlockDetails, BlockHash, BlockNumber, NodeHardware, NodeIO, NodeId, NodeStats,
Timestamp,
};
use serde_json::to_writer;
use shared::types::{
Address, BlockDetails, BlockHash, BlockNumber, NodeHardware, NodeIO, NodeId, NodeStats,
Timestamp, NodeDetails,
};
pub mod connector;
use connector::Serialized;
pub trait FeedMessage: Serialize {
pub trait FeedMessage {
const ACTION: u8;
}
pub trait FeedMessageWrite: FeedMessage {
fn write_to_feed(&self, ser: &mut FeedMessageSerializer);
}
impl<T> FeedMessageWrite for T
where
T: FeedMessage + Serialize,
{
fn write_to_feed(&self, ser: &mut FeedMessageSerializer) {
ser.write(self)
}
}
pub struct FeedMessageSerializer {
/// Current buffer,
buffer: Vec<u8>,
@@ -33,7 +46,7 @@ impl FeedMessageSerializer {
pub fn push<Message>(&mut self, msg: Message)
where
Message: FeedMessage,
Message: FeedMessageWrite,
{
let glue = match self.buffer.len() {
0 => b'[',
@@ -41,9 +54,16 @@ impl FeedMessageSerializer {
};
self.buffer.push(glue);
let _ = to_writer(&mut self.buffer, &Message::ACTION);
self.write(&Message::ACTION);
self.buffer.push(b',');
let _ = to_writer(&mut self.buffer, &msg);
msg.write_to_feed(self);
}
fn write<S>(&mut self, value: &S)
where
S: Serialize,
{
let _ = to_writer(&mut self.buffer, value);
}
pub fn finalize(&mut self) -> Option<Serialized> {
@@ -175,21 +195,28 @@ pub struct AfgAuthoritySet(
#[derive(Serialize)]
pub struct StaleNode(pub NodeId);
impl Serialize for AddedNode<'_> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
impl FeedMessageWrite for AddedNode<'_> {
fn write_to_feed(&self, ser: &mut FeedMessageSerializer) {
let AddedNode(nid, node) = self;
let mut tup = serializer.serialize_tuple(8)?;
tup.serialize_element(nid)?;
tup.serialize_element(node.details())?;
tup.serialize_element(node.stats())?;
tup.serialize_element(node.io())?;
tup.serialize_element(node.hardware())?;
tup.serialize_element(node.block_details())?;
tup.serialize_element(&node.location())?;
tup.serialize_element(&node.startup_time())?;
tup.end()
let details = node.details();
let details = (
&details.name,
&details.implementation,
&details.version,
&details.validator,
&details.network_id,
);
ser.write(&(
nid,
details,
node.stats(),
node.io(),
node.hardware(),
node.block_details(),
&node.location(),
&node.startup_time(),
));
}
}
+1 -1
View File
@@ -1,10 +1,10 @@
use crate::aggregator::{Aggregator, Connect, Disconnect, NoMoreFinality, SendFinality, Subscribe};
use crate::chain::Unsubscribe;
use crate::feed::{FeedMessageSerializer, Pong};
use crate::util::fnv;
use actix::prelude::*;
use actix_web_actors::ws;
use bytes::Bytes;
use shared::util::fnv;
use std::time::{Duration, Instant};
pub type FeedId = usize;
@@ -7,7 +7,7 @@ use rustc_hash::FxHashMap;
use serde::Deserialize;
use crate::chain::{Chain, LocateNode};
use crate::types::{NodeId, NodeLocation};
use shared::types::{NodeId, NodeLocation};
#[derive(Clone)]
pub struct Locator {
+11 -7
View File
@@ -12,21 +12,20 @@ use simple_logger::SimpleLogger;
mod aggregator;
mod chain;
mod feed;
mod location;
mod node;
mod shard;
mod types;
mod util;
use aggregator::{Aggregator, GetHealth};
use feed::connector::FeedConnector;
use location::{Locator, LocatorFactory};
use node::connector::NodeConnector;
use shard::connector::ShardConnector;
use util::{Locator, LocatorFactory};
const VERSION: &str = env!("CARGO_PKG_VERSION");
const AUTHORS: &str = env!("CARGO_PKG_AUTHORS");
const NAME: &str = "Substrate Telemetry Backend";
const ABOUT: &str = "This is the Telemetry Backend that injects and provide the data sent by Substrate/Polkadot nodes";
const NAME: &str = "Substrate Telemetry Backend Core";
const ABOUT: &str = "This is the Telemetry Backend Core that injects and provide the data sent by Substrate/Polkadot nodes";
#[derive(Clap, Debug)]
#[clap(name = NAME, version = VERSION, author = AUTHORS, about = ABOUT)]
@@ -109,17 +108,21 @@ async fn shard_route(
req: HttpRequest,
stream: web::Payload,
aggregator: web::Data<Addr<Aggregator>>,
locator: web::Data<Addr<Locator>>,
path: web::Path<Box<str>>,
) -> Result<HttpResponse, Error> {
let hash_str = path.into_inner();
let genesis_hash = hash_str.parse()?;
println!("Genesis hash {}", genesis_hash);
let mut res = ws::handshake(&req)?;
let aggregator = aggregator.get_ref().clone();
let locator = locator.get_ref().clone().recipient();
Ok(res.streaming(ws::WebsocketContext::with_codec(
ShardConnector::new(aggregator, genesis_hash),
ShardConnector::new(aggregator, locator, genesis_hash),
stream,
Codec::new().max_size(10 * 1024 * 1024), // 10mb frame limit
)))
@@ -171,7 +174,7 @@ async fn main() -> std::io::Result<()> {
let aggregator = Aggregator::new(denylist).start();
let factory = LocatorFactory::new();
let locator = SyncArbiter::start(4, move || factory.create());
log::info!("Starting telemetry version: {}", env!("CARGO_PKG_VERSION"));
log::info!("Starting Telemetry Core version: {}", env!("CARGO_PKG_VERSION"));
HttpServer::new(move || {
App::new()
.wrap(middleware::NormalizePath::default())
@@ -179,6 +182,7 @@ async fn main() -> std::io::Result<()> {
.data(locator.clone())
.service(node_route)
.service(feed_route)
.service(shard_route)
.service(health)
})
.bind(opts.socket)?
+3 -5
View File
@@ -1,15 +1,13 @@
use std::sync::Arc;
use crate::types::{
use shared::types::{
Block, BlockDetails, NodeDetails, NodeHardware, NodeIO, NodeId, NodeLocation, NodeStats,
Timestamp,
};
use crate::util::now;
use shared::util::now;
use shared::node::SystemInterval;
pub mod connector;
pub mod message;
use message::SystemInterval;
/// Minimum time between block below broadcasting updates to the browser gets throttled, in ms.
const THROTTLE_THRESHOLD: u64 = 100;
+22 -67
View File
@@ -1,25 +1,22 @@
use std::collections::BTreeMap;
use std::mem;
use std::net::Ipv4Addr;
use std::time::{Duration, Instant};
use crate::aggregator::{AddNode, Aggregator};
use crate::aggregator::{AddNode, Aggregator, NodeSource};
use crate::chain::{Chain, RemoveNode, UpdateNode};
use crate::node::message::{NodeMessage, Payload};
use crate::location::LocateRequest;
use crate::node::NodeId;
use crate::types::ConnId;
use crate::util::LocateRequest;
use actix::prelude::*;
use actix_http::ws::Item;
use actix_web_actors::ws::{self, CloseReason};
use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use shared::types::ConnId;
use shared::ws::{MultipartHandler, WsMessage, MuteReason};
use shared::node::{NodeMessage, Payload};
/// How often heartbeat pings are sent
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(20);
/// How long before lack of client response causes a timeout
const CLIENT_TIMEOUT: Duration = Duration::from_secs(60);
/// Continuation buffer limit, 10mb
const CONT_BUF_LIMIT: usize = 10 * 1024 * 1024;
pub struct NodeConnector {
/// Multiplexing connections by id
@@ -32,8 +29,8 @@ pub struct NodeConnector {
ip: Option<Ipv4Addr>,
/// Actix address of location services
locator: Recipient<LocateRequest>,
/// Buffer for constructing continuation messages
contbuf: BytesMut,
/// Helper for handling continuation messages
multipart: MultipartHandler,
}
enum ConnMultiplex {
@@ -85,7 +82,7 @@ impl NodeConnector {
aggregator,
ip,
locator,
contbuf: BytesMut::new(),
multipart: MultipartHandler::default(),
}
}
@@ -123,8 +120,10 @@ impl NodeConnector {
self.aggregator.do_send(AddNode {
node: connected.node,
genesis_hash: connected.genesis_hash,
conn_id,
node_connector: ctx.address(),
source: NodeSource::Direct {
conn_id,
node_connector: ctx.address(),
},
});
} else {
if backlog.len() >= 10 {
@@ -136,42 +135,14 @@ impl NodeConnector {
}
}
}
fn start_frame(&mut self, bytes: &[u8]) {
if !self.contbuf.is_empty() {
log::error!("Unused continuation buffer");
self.contbuf.clear();
}
self.continue_frame(bytes);
}
fn continue_frame(&mut self, bytes: &[u8]) {
if self.contbuf.len() + bytes.len() <= CONT_BUF_LIMIT {
self.contbuf.extend_from_slice(&bytes);
} else {
log::error!("Continuation buffer overflow");
self.contbuf = BytesMut::new();
}
}
fn finish_frame(&mut self) -> Bytes {
mem::replace(&mut self.contbuf, BytesMut::new()).freeze()
}
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct Mute {
pub reason: CloseReason,
}
impl Handler<Mute> for NodeConnector {
impl Handler<MuteReason> for NodeConnector {
type Result = ();
fn handle(&mut self, msg: Mute, ctx: &mut Self::Context) {
let Mute { reason } = msg;
log::debug!(target: "NodeConnector::Mute", "Muting a node. Reason: {:?}", reason.description);
fn handle(&mut self, msg: MuteReason, ctx: &mut Self::Context) {
log::debug!(target: "NodeConnector::Mute", "Muting a node. Reason: {:?}", msg);
ctx.close(Some(reason));
ctx.close(Some(msg.into()));
ctx.stop();
}
}
@@ -221,34 +192,18 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for NodeConnector {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
self.hb = Instant::now();
let data = match msg {
Ok(ws::Message::Ping(msg)) => {
let data = match msg.map(|msg| self.multipart.handle(msg)) {
Ok(WsMessage::Nop) => return,
Ok(WsMessage::Ping(msg)) => {
ctx.pong(&msg);
return;
}
Ok(ws::Message::Pong(_)) => return,
Ok(ws::Message::Text(text)) => text.into_bytes(),
Ok(ws::Message::Binary(data)) => data,
Ok(ws::Message::Close(reason)) => {
Ok(WsMessage::Data(data)) => data,
Ok(WsMessage::Close(reason)) => {
ctx.close(reason);
ctx.stop();
return;
}
Ok(ws::Message::Nop) => return,
Ok(ws::Message::Continuation(cont)) => match cont {
Item::FirstText(bytes) | Item::FirstBinary(bytes) => {
self.start_frame(&bytes);
return;
}
Item::Continue(bytes) => {
self.continue_frame(&bytes);
return;
}
Item::Last(bytes) => {
self.continue_frame(&bytes);
self.finish_frame()
}
},
Err(error) => {
log::error!("{:?}", error);
ctx.stop();
-196
View File
@@ -1,196 +0,0 @@
use crate::node::NodeDetails;
use crate::types::{Block, BlockHash, BlockNumber, ConnId};
use crate::util::Hash;
use actix::prelude::*;
use serde::de::IgnoredAny;
use serde::Deserialize;
#[derive(Deserialize, Debug, Message)]
#[rtype(result = "()")]
#[serde(untagged)]
pub enum NodeMessage {
V1 {
#[serde(flatten)]
payload: Payload,
},
V2 {
id: ConnId,
payload: Payload,
},
}
impl NodeMessage {
/// Returns the connection ID or 0 if there is no ID.
pub fn id(&self) -> ConnId {
match self {
NodeMessage::V1 { .. } => 0,
NodeMessage::V2 { id, .. } => *id,
}
}
}
impl From<NodeMessage> for Payload {
fn from(msg: NodeMessage) -> Payload {
match msg {
NodeMessage::V1 { payload, .. } | NodeMessage::V2 { payload, .. } => payload,
}
}
}
#[derive(Deserialize, Debug)]
#[serde(tag = "msg")]
pub enum Payload {
#[serde(rename = "system.connected")]
SystemConnected(SystemConnected),
#[serde(rename = "system.interval")]
SystemInterval(SystemInterval),
#[serde(rename = "block.import")]
BlockImport(Block),
#[serde(rename = "notify.finalized")]
NotifyFinalized(Finalized),
#[serde(rename = "txpool.import")]
TxPoolImport(IgnoredAny),
#[serde(rename = "afg.finalized")]
AfgFinalized(AfgFinalized),
#[serde(rename = "afg.received_precommit")]
AfgReceivedPrecommit(AfgReceivedPrecommit),
#[serde(rename = "afg.received_prevote")]
AfgReceivedPrevote(AfgReceivedPrevote),
#[serde(rename = "afg.received_commit")]
AfgReceivedCommit(AfgReceivedCommit),
#[serde(rename = "afg.authority_set")]
AfgAuthoritySet(AfgAuthoritySet),
#[serde(rename = "afg.finalized_blocks_up_to")]
AfgFinalizedBlocksUpTo(IgnoredAny),
#[serde(rename = "aura.pre_sealed_block")]
AuraPreSealedBlock(IgnoredAny),
#[serde(rename = "prepared_block_for_proposing")]
PreparedBlockForProposing(IgnoredAny),
}
#[derive(Deserialize, Debug)]
pub struct SystemConnected {
pub genesis_hash: Hash,
#[serde(flatten)]
pub node: NodeDetails,
}
#[derive(Deserialize, Debug)]
pub struct SystemInterval {
pub peers: Option<u64>,
pub txcount: Option<u64>,
pub bandwidth_upload: Option<f64>,
pub bandwidth_download: Option<f64>,
pub finalized_height: Option<BlockNumber>,
pub finalized_hash: Option<BlockHash>,
#[serde(flatten)]
pub block: Option<Block>,
pub used_state_cache_size: Option<f32>,
}
#[derive(Deserialize, Debug)]
pub struct Finalized {
#[serde(rename = "best")]
pub hash: BlockHash,
pub height: Box<str>,
}
#[derive(Deserialize, Debug)]
pub struct AfgAuthoritySet {
pub authority_id: Box<str>,
pub authorities: Box<str>,
pub authority_set_id: Box<str>,
}
#[derive(Deserialize, Debug, Clone)]
pub struct AfgFinalized {
pub finalized_hash: BlockHash,
pub finalized_number: Box<str>,
}
#[derive(Deserialize, Debug, Clone)]
pub struct AfgReceived {
pub target_hash: BlockHash,
pub target_number: Box<str>,
pub voter: Option<Box<str>>,
}
#[derive(Deserialize, Debug, Clone)]
pub struct AfgReceivedPrecommit {
#[serde(flatten)]
pub received: AfgReceived,
}
#[derive(Deserialize, Debug, Clone)]
pub struct AfgReceivedPrevote {
#[serde(flatten)]
pub received: AfgReceived,
}
#[derive(Deserialize, Debug, Clone)]
pub struct AfgReceivedCommit {
#[serde(flatten)]
pub received: AfgReceived,
}
impl Block {
pub fn zero() -> Self {
Block {
hash: BlockHash::from([0; 32]),
height: 0,
}
}
}
impl Payload {
pub fn best_block(&self) -> Option<&Block> {
match self {
Payload::BlockImport(block) => Some(block),
Payload::SystemInterval(SystemInterval { block, .. }) => block.as_ref(),
_ => None,
}
}
pub fn finalized_block(&self) -> Option<Block> {
match self {
Payload::SystemInterval(ref interval) => Some(Block {
hash: interval.finalized_hash?,
height: interval.finalized_height?,
}),
Payload::NotifyFinalized(ref finalized) => Some(Block {
hash: finalized.hash,
height: finalized.height.parse().ok()?,
}),
_ => None,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn message_v1() {
let json = r#"{"msg":"notify.finalized","level":"INFO","ts":"2021-01-13T12:38:25.410794650+01:00","best":"0x031c3521ca2f9c673812d692fc330b9a18e18a2781e3f9976992f861fd3ea0cb","height":"50"}"#;
assert!(
matches!(
serde_json::from_str::<NodeMessage>(json).unwrap(),
NodeMessage::V1 { .. },
),
"message did not match variant V1",
);
}
#[test]
fn message_v2() {
let json = r#"{"id":1,"ts":"2021-01-13T12:22:20.053527101+01:00","payload":{"best":"0xcc41708573f2acaded9dd75e07dac2d4163d136ca35b3061c558d7a35a09dd8d","height":"209","msg":"notify.finalized"}}"#;
assert!(
matches!(
serde_json::from_str::<NodeMessage>(json).unwrap(),
NodeMessage::V2 { .. },
),
"message did not match variant V2",
);
}
}
-12
View File
@@ -1,13 +1 @@
use crate::node::message::Payload;
use serde::Deserialize;
pub mod connector;
/// Alias for the ID of the node connection
type ShardConnId = usize;
#[derive(Deserialize)]
pub struct ShardMessage {
pub conn_id: ShardConnId,
pub payload: Payload,
}
+98 -61
View File
@@ -1,23 +1,22 @@
use std::mem;
use std::time::{Duration, Instant};
use std::collections::BTreeMap;
use std::net::Ipv4Addr;
use crate::aggregator::{AddNode, Aggregator};
use crate::aggregator::{AddNode, Aggregator, NodeSource};
use crate::chain::{Chain, RemoveNode, UpdateNode};
use crate::shard::ShardMessage;
use crate::types::NodeId;
use crate::util::{DenseMap, Hash};
use crate::location::LocateRequest;
use actix::prelude::*;
use actix_http::ws::Item;
use actix_web_actors::ws::{self, CloseReason};
use bincode::Options;
use bytes::{Bytes, BytesMut};
use shared::types::NodeId;
use shared::util::Hash;
use shared::ws::{MultipartHandler, WsMessage};
use shared::shard::{ShardMessage, ShardConnId, BackendMessage};
/// How often heartbeat pings are sent
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(20);
/// How long before lack of client response causes a timeout
const CLIENT_TIMEOUT: Duration = Duration::from_secs(60);
/// Continuation buffer limit, 10mb
const CONT_BUF_LIMIT: usize = 10 * 1024 * 1024;
pub struct ShardConnector {
/// Client must send ping at least once every 60 seconds (CLIENT_TIMEOUT),
@@ -26,12 +25,16 @@ pub struct ShardConnector {
aggregator: Addr<Aggregator>,
/// Genesis hash of the chain this connection will be submitting data for
genesis_hash: Hash,
/// Chain address to which this multiplex connector is delegating messages
/// Chain address to which this shard connector is delegating messages
chain: Option<Addr<Chain>>,
/// Mapping `ShardConnId` to `NodeId`
nodes: DenseMap<NodeId>,
/// Buffer for constructing continuation messages
contbuf: BytesMut,
/// Transient mapping of `ShardConnId` to external IP address.
ips: BTreeMap<ShardConnId, Ipv4Addr>,
/// Mapping of `ShardConnId` to initialized `NodeId`s.
nodes: BTreeMap<ShardConnId, NodeId>,
/// Actix address of location services
locator: Recipient<LocateRequest>,
/// Container for handling continuation messages
multipart: MultipartHandler,
}
impl Actor for ShardConnector {
@@ -43,7 +46,7 @@ impl Actor for ShardConnector {
fn stopped(&mut self, _: &mut Self::Context) {
if let Some(ref chain) = self.chain {
for (_, nid) in self.nodes.iter() {
for nid in self.nodes.values() {
chain.do_send(RemoveNode(*nid))
}
}
@@ -51,17 +54,31 @@ impl Actor for ShardConnector {
}
impl ShardConnector {
pub fn new(aggregator: Addr<Aggregator>, genesis_hash: Hash) -> Self {
pub fn new(
aggregator: Addr<Aggregator>,
locator: Recipient<LocateRequest>,
genesis_hash: Hash,
) -> Self {
Self {
hb: Instant::now(),
aggregator,
genesis_hash,
chain: None,
nodes: DenseMap::new(),
contbuf: BytesMut::new(),
ips: BTreeMap::new(),
nodes: BTreeMap::new(),
locator,
multipart: MultipartHandler::default(),
}
}
fn shard_send(msg: BackendMessage, ctx: &mut <Self as Actor>::Context) {
let bytes = bincode::options().serialize(&msg).expect("Must be able to serialize to vec; qed");
println!("Sending back {} bytes", bytes.len());
ctx.binary(bytes);
}
fn heartbeat(&self, ctx: &mut <Self as Actor>::Context) {
ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
// check client heartbeats
@@ -77,30 +94,66 @@ impl ShardConnector {
}
fn handle_message(&mut self, msg: ShardMessage, ctx: &mut <Self as Actor>::Context) {
let ShardMessage { conn_id, payload } = msg;
println!("{:?}", msg);
// TODO: get `NodeId` for `ShardConnId` and proxy payload to `self.chain`.
}
match msg {
ShardMessage::AddNode { ip, node, sid } => {
if let Some(ip) = ip {
self.ips.insert(sid, ip);
}
fn start_frame(&mut self, bytes: &[u8]) {
if !self.contbuf.is_empty() {
log::error!("Unused continuation buffer");
self.contbuf.clear();
}
self.continue_frame(bytes);
}
fn continue_frame(&mut self, bytes: &[u8]) {
if self.contbuf.len() + bytes.len() <= CONT_BUF_LIMIT {
self.contbuf.extend_from_slice(&bytes);
} else {
log::error!("Continuation buffer overflow");
self.contbuf = BytesMut::new();
self.aggregator.do_send(AddNode {
node,
genesis_hash: self.genesis_hash,
source: NodeSource::Shard {
sid,
shard_connector: ctx.address(),
}
});
},
ShardMessage::UpdateNode { nid, payload } => {
if let Some(chain) = self.chain.as_ref() {
chain.do_send(UpdateNode {
nid,
payload,
});
}
},
}
}
}
fn finish_frame(&mut self) -> Bytes {
mem::replace(&mut self.contbuf, BytesMut::new()).freeze()
#[derive(Message)]
#[rtype(result = "()")]
pub struct Initialize {
pub nid: NodeId,
pub sid: ShardConnId,
pub chain: Addr<Chain>,
}
impl Handler<Initialize> for ShardConnector {
type Result = ();
fn handle(&mut self, msg: Initialize, ctx: &mut Self::Context) {
let Initialize {
nid,
sid,
chain,
} = msg;
log::trace!(target: "ShardConnector::Initialize", "Initializing a node, nid={}, on conn_id={}", nid, 0);
if self.chain.is_none() {
self.chain = Some(chain.clone());
}
let be_msg = BackendMessage::Initialize { sid, nid };
Self::shard_send(be_msg, ctx);
// Acquire the node's physical location
if let Some(ip) = self.ips.remove(&sid) {
let _ = self.locator.do_send(LocateRequest { ip, nid, chain });
}
}
}
@@ -108,34 +161,18 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for ShardConnector {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
self.hb = Instant::now();
let data = match msg {
Ok(ws::Message::Ping(msg)) => {
let data = match msg.map(|msg| self.multipart.handle(msg)) {
Ok(WsMessage::Nop) => return,
Ok(WsMessage::Ping(msg)) => {
ctx.pong(&msg);
return;
}
Ok(ws::Message::Pong(_)) => return,
Ok(ws::Message::Text(text)) => text.into_bytes(),
Ok(ws::Message::Binary(data)) => data,
Ok(ws::Message::Close(reason)) => {
Ok(WsMessage::Data(data)) => data,
Ok(WsMessage::Close(reason)) => {
ctx.close(reason);
ctx.stop();
return;
}
Ok(ws::Message::Nop) => return,
Ok(ws::Message::Continuation(cont)) => match cont {
Item::FirstText(bytes) | Item::FirstBinary(bytes) => {
self.start_frame(&bytes);
return;
}
Item::Continue(bytes) => {
self.continue_frame(&bytes);
return;
}
Item::Last(bytes) => {
self.continue_frame(&bytes);
self.finish_frame()
}
},
Err(error) => {
log::error!("{:?}", error);
ctx.stop();
@@ -145,12 +182,12 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for ShardConnector {
match bincode::options().deserialize(&data) {
Ok(msg) => self.handle_message(msg, ctx),
#[cfg(debug)]
// #[cfg(debug)]
Err(err) => {
log::warn!("Failed to parse shard message: {}", err,)
}
#[cfg(not(debug))]
Err(_) => (),
// #[cfg(not(debug))]
// Err(_) => (),
}
}
}
-155
View File
@@ -1,155 +0,0 @@
use serde::ser::{Serialize, SerializeTuple, Serializer};
use serde::Deserialize;
use crate::util::{now, MeanList};
pub type NodeId = usize;
pub type ConnId = u64;
pub type BlockNumber = u64;
pub type Timestamp = u64;
pub type Address = Box<str>;
pub use primitive_types::H256 as BlockHash;
#[derive(Deserialize, Debug, Clone)]
pub struct NodeDetails {
pub chain: Box<str>,
pub name: Box<str>,
pub implementation: Box<str>,
pub version: Box<str>,
pub validator: Option<Box<str>>,
pub network_id: Option<Box<str>>,
pub startup_time: Option<Box<str>>,
}
#[derive(Deserialize, Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct NodeStats {
pub peers: u64,
pub txcount: u64,
}
#[derive(Default)]
pub struct NodeIO {
pub used_state_cache_size: MeanList<f32>,
}
#[derive(Deserialize, Debug, Clone, Copy)]
pub struct Block {
#[serde(rename = "best")]
pub hash: BlockHash,
pub height: BlockNumber,
}
#[derive(Debug, Clone, Copy)]
pub struct BlockDetails {
pub block: Block,
pub block_time: u64,
pub block_timestamp: u64,
pub propagation_time: Option<u64>,
}
impl Default for BlockDetails {
fn default() -> Self {
BlockDetails {
block: Block::zero(),
block_timestamp: now(),
block_time: 0,
propagation_time: None,
}
}
}
#[derive(Default)]
pub struct NodeHardware {
/// Upload uses means
pub upload: MeanList<f64>,
/// Download uses means
pub download: MeanList<f64>,
/// Stampchange uses means
pub chart_stamps: MeanList<f64>,
}
#[derive(Deserialize, Debug, Clone)]
pub struct NodeLocation {
pub latitude: f32,
pub longitude: f32,
pub city: Box<str>,
}
impl Serialize for NodeDetails {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut tup = serializer.serialize_tuple(6)?;
tup.serialize_element(&self.name)?;
tup.serialize_element(&self.implementation)?;
tup.serialize_element(&self.version)?;
tup.serialize_element(&self.validator)?;
tup.serialize_element(&self.network_id)?;
tup.end()
}
}
impl Serialize for NodeStats {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut tup = serializer.serialize_tuple(2)?;
tup.serialize_element(&self.peers)?;
tup.serialize_element(&self.txcount)?;
tup.end()
}
}
impl Serialize for NodeIO {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut tup = serializer.serialize_tuple(1)?;
tup.serialize_element(self.used_state_cache_size.slice())?;
tup.end()
}
}
impl Serialize for BlockDetails {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut tup = serializer.serialize_tuple(5)?;
tup.serialize_element(&self.block.height)?;
tup.serialize_element(&self.block.hash)?;
tup.serialize_element(&self.block_time)?;
tup.serialize_element(&self.block_timestamp)?;
tup.serialize_element(&self.propagation_time)?;
tup.end()
}
}
impl Serialize for NodeLocation {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut tup = serializer.serialize_tuple(3)?;
tup.serialize_element(&self.latitude)?;
tup.serialize_element(&self.longitude)?;
tup.serialize_element(&&*self.city)?;
tup.end()
}
}
impl Serialize for NodeHardware {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut tup = serializer.serialize_tuple(3)?;
tup.serialize_element(self.upload.slice())?;
tup.serialize_element(self.download.slice())?;
tup.serialize_element(self.chart_stamps.slice())?;
tup.end()
}
}
-31
View File
@@ -1,31 +0,0 @@
mod dense_map;
mod hash;
mod location;
mod mean_list;
mod num_stats;
pub use dense_map::DenseMap;
pub use hash::Hash;
pub use location::{LocateRequest, Locator, LocatorFactory};
pub use mean_list::MeanList;
pub use num_stats::NumStats;
pub fn fnv<D: AsRef<[u8]>>(data: D) -> u64 {
use fnv::FnvHasher;
use std::hash::Hasher;
let mut hasher = FnvHasher::default();
hasher.write(data.as_ref());
hasher.finish()
}
/// Returns current unix time in ms (compatible with JS Date.now())
pub fn now() -> u64 {
use std::time::SystemTime;
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("System time must be configured to be post Unix Epoch start; qed")
.as_millis() as u64
}
-80
View File
@@ -1,80 +0,0 @@
pub type Id = usize;
pub struct DenseMap<T> {
/// List of retired indexes that can be re-used
retired: Vec<Id>,
/// All items
items: Vec<Option<T>>,
}
impl<T> DenseMap<T> {
pub fn new() -> Self {
DenseMap {
retired: Vec::new(),
items: Vec::new(),
}
}
pub fn add(&mut self, item: T) -> Id {
self.add_with(|_| item)
}
pub fn add_with<F>(&mut self, f: F) -> Id
where
F: FnOnce(Id) -> T,
{
match self.retired.pop() {
Some(id) => {
self.items[id] = Some(f(id));
id
}
None => {
let id = self.items.len();
self.items.push(Some(f(id)));
id
}
}
}
pub fn get(&self, id: Id) -> Option<&T> {
self.items.get(id).and_then(|item| item.as_ref())
}
pub fn get_mut(&mut self, id: Id) -> Option<&mut T> {
self.items.get_mut(id).and_then(|item| item.as_mut())
}
pub fn remove(&mut self, id: Id) -> Option<T> {
let old = self.items.get_mut(id).and_then(|item| item.take());
if old.is_some() {
// something was actually removed, so lets add the id to
// the list of retired ids!
self.retired.push(id);
}
old
}
pub fn iter(&self) -> impl Iterator<Item = (Id, &T)> + '_ {
self.items
.iter()
.enumerate()
.filter_map(|(id, item)| Some((id, item.as_ref()?)))
}
pub fn iter_mut(&mut self) -> impl Iterator<Item = (Id, &mut T)> + '_ {
self.items
.iter_mut()
.enumerate()
.filter_map(|(id, item)| Some((id, item.as_mut()?)))
}
pub fn len(&self) -> usize {
self.items.len() - self.retired.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
-89
View File
@@ -1,89 +0,0 @@
use std::fmt::{self, Debug, Display};
use std::str::FromStr;
use actix_web::error::ResponseError;
use serde::de::{self, Deserialize, Deserializer, Unexpected, Visitor};
const HASH_BYTES: usize = 32;
/// Newtype wrapper for 32-byte hash values, implementing readable `Debug` and `serde::Deserialize`.
// We could use primitive_types::H256 here, but opted for a custom type to avoid more dependencies.
#[derive(Hash, PartialEq, Eq, Clone, Copy)]
pub struct Hash([u8; HASH_BYTES]);
struct HashVisitor;
impl<'de> Visitor<'de> for HashVisitor {
type Value = Hash;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("hexidecimal string of 32 bytes beginning with 0x")
}
fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
where
E: de::Error,
{
value
.parse()
.map_err(|_| de::Error::invalid_value(Unexpected::Str(value), &self))
}
}
impl FromStr for Hash {
type Err = HashParseError;
fn from_str(value: &str) -> Result<Self, Self::Err> {
if !value.starts_with("0x") {
return Err(HashParseError::InvalidPrefix);
}
let mut hash = [0; HASH_BYTES];
hex::decode_to_slice(&value[2..], &mut hash).map_err(HashParseError::HexError)?;
Ok(Hash(hash))
}
}
impl<'de> Deserialize<'de> for Hash {
fn deserialize<D>(deserializer: D) -> Result<Hash, D::Error>
where
D: Deserializer<'de>,
{
deserializer.deserialize_str(HashVisitor)
}
}
impl Display for Hash {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str("0x")?;
let mut ascii = [0; HASH_BYTES * 2];
hex::encode_to_slice(self.0, &mut ascii)
.expect("Encoding 32 bytes into 64 bytes of ascii; qed");
f.write_str(std::str::from_utf8(&ascii).expect("ASCII hex encoded bytes canot fail; qed"))
}
}
impl Debug for Hash {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
Display::fmt(self, f)
}
}
#[derive(thiserror::Error, Debug)]
pub enum HashParseError {
HexError(hex::FromHexError),
InvalidPrefix,
}
impl Display for HashParseError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
Debug::fmt(self, f)
}
}
impl ResponseError for HashParseError {}
-79
View File
@@ -1,79 +0,0 @@
use num_traits::{Float, Zero};
use std::ops::AddAssign;
pub struct MeanList<T>
where
T: Float + AddAssign + Zero + From<u8>,
{
period_sum: T,
period_count: u8,
mean_index: u8,
means: [T; 20],
ticks_per_mean: u8,
}
impl<T> Default for MeanList<T>
where
T: Float + AddAssign + Zero + From<u8>,
{
fn default() -> MeanList<T> {
MeanList {
period_sum: T::zero(),
period_count: 0,
mean_index: 0,
means: [T::zero(); 20],
ticks_per_mean: 1,
}
}
}
impl<T> MeanList<T>
where
T: Float + AddAssign + Zero + From<u8>,
{
pub fn slice(&self) -> &[T] {
&self.means[..usize::from(self.mean_index)]
}
pub fn push(&mut self, val: T) -> bool {
if self.mean_index == 20 && self.ticks_per_mean < 32 {
self.squash_means();
}
self.period_sum += val;
self.period_count += 1;
if self.period_count == self.ticks_per_mean {
self.push_mean();
true
} else {
false
}
}
fn push_mean(&mut self) {
let mean = self.period_sum / std::convert::From::from(self.period_count);
if self.mean_index == 20 && self.ticks_per_mean == 32 {
self.means.rotate_left(1);
self.means[19] = mean;
} else {
self.means[usize::from(self.mean_index)] = mean;
self.mean_index += 1;
}
self.period_sum = T::zero();
self.period_count = 0;
}
fn squash_means(&mut self) {
self.ticks_per_mean *= 2;
self.mean_index = 10;
for i in 0..10 {
let i2 = i * 2;
self.means[i] = (self.means[i2] + self.means[i2 + 1]) / std::convert::From::from(2)
}
}
}
-104
View File
@@ -1,104 +0,0 @@
use num_traits::{Bounded, NumOps, Zero};
use std::convert::TryFrom;
use std::iter::Sum;
/// Keep track of last N numbers pushed onto internal stack.
/// Provides means to get an average of said numbers.
pub struct NumStats<T> {
stack: Box<[T]>,
index: usize,
sum: T,
}
impl<T: NumOps + Zero + Bounded + Copy + Sum + TryFrom<usize>> NumStats<T> {
pub fn new(size: usize) -> Self {
NumStats {
stack: vec![T::zero(); size].into_boxed_slice(),
index: 0,
sum: T::zero(),
}
}
pub fn push(&mut self, val: T) {
let slot = &mut self.stack[self.index % self.stack.len()];
self.sum = (self.sum + val) - *slot;
*slot = val;
self.index += 1;
}
pub fn average(&self) -> T {
let cap = std::cmp::min(self.index, self.stack.len());
if cap == 0 {
return T::zero();
}
let cap = T::try_from(cap).unwrap_or_else(|_| T::max_value());
self.sum / cap
}
pub fn reset(&mut self) {
self.index = 0;
self.sum = T::zero();
for val in self.stack.iter_mut() {
*val = T::zero();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn calculates_correct_average() {
let mut stats: NumStats<u64> = NumStats::new(10);
stats.push(3);
stats.push(7);
assert_eq!(stats.average(), 5);
}
#[test]
fn calculates_correct_average_over_bounds() {
let mut stats: NumStats<u64> = NumStats::new(10);
stats.push(100);
for _ in 0..9 {
stats.push(0);
}
assert_eq!(stats.average(), 10);
stats.push(0);
assert_eq!(stats.average(), 0);
}
#[test]
fn resets_properly() {
let mut stats: NumStats<u64> = NumStats::new(10);
for _ in 0..10 {
stats.push(100);
}
assert_eq!(stats.average(), 100);
stats.reset();
assert_eq!(stats.average(), 0);
stats.push(7);
stats.push(3);
assert_eq!(stats.average(), 5);
}
}