AFG messaging (#210)

* First draft of afg messaging

* AfgReceivedPrevote, AfgReceivedPrecommit and AfgFinalized added to rust backend

* Tidy up
This commit is contained in:
Maciej Hirsz
2019-12-06 18:21:25 +01:00
committed by GitHub
parent 92fb9d28de
commit 0a89382127
7 changed files with 190 additions and 10 deletions
+36
View File
@@ -102,6 +102,20 @@ impl Message for Subscribe {
type Result = bool; type Result = bool;
} }
/// Message sent from the FeedConnector to the Aggregator consensus requested
#[derive(Message)]
pub struct SendFinality {
pub chain: Label,
pub fid: FeedId,
}
/// Message sent from the FeedConnector to the Aggregator no more consensus required
#[derive(Message)]
pub struct NoMoreFinality {
pub chain: Label,
pub fid: FeedId,
}
/// Message sent from the FeedConnector to the Aggregator when first connected /// Message sent from the FeedConnector to the Aggregator when first connected
#[derive(Message)] #[derive(Message)]
pub struct Connect(pub Addr<FeedConnector>); pub struct Connect(pub Addr<FeedConnector>);
@@ -165,6 +179,28 @@ impl Handler<Subscribe> for Aggregator {
} }
} }
impl Handler<SendFinality> for Aggregator {
type Result = ();
fn handle(&mut self, msg: SendFinality, _: &mut Self::Context) {
let SendFinality { chain, fid } = msg;
if let Some(chain) = self.get_chain(&chain) {
chain.addr.do_send(chain::SendFinality(fid));
}
}
}
impl Handler<NoMoreFinality> for Aggregator {
type Result = ();
fn handle(&mut self, msg: NoMoreFinality, _: &mut Self::Context) {
let NoMoreFinality { chain, fid } = msg;
if let Some(chain) = self.get_chain(&chain) {
chain.addr.do_send(chain::NoMoreFinality(fid));
}
}
}
impl Handler<Connect> for Aggregator { impl Handler<Connect> for Aggregator {
type Result = (); type Result = ();
+78 -1
View File
@@ -1,13 +1,14 @@
use actix::prelude::*; use actix::prelude::*;
use std::sync::Arc; use std::sync::Arc;
use bytes::Bytes; use bytes::Bytes;
use rustc_hash::FxHashMap;
use crate::aggregator::{Aggregator, DropChain, NodeCount}; use crate::aggregator::{Aggregator, DropChain, NodeCount};
use crate::node::{Node, connector::Initialize, message::{NodeMessage, Details}}; use crate::node::{Node, connector::Initialize, message::{NodeMessage, Details}};
use crate::feed::connector::{FeedId, FeedConnector, Subscribed, Unsubscribed}; use crate::feed::connector::{FeedId, FeedConnector, Subscribed, Unsubscribed};
use crate::feed::{self, FeedMessageSerializer}; use crate::feed::{self, FeedMessageSerializer};
use crate::util::{DenseMap, NumStats, now}; use crate::util::{DenseMap, NumStats, now};
use crate::types::{NodeId, NodeDetails, NodeLocation, Block, Timestamp}; use crate::types::{NodeId, NodeDetails, NodeLocation, Block, Timestamp, BlockNumber};
const STALE_TIMEOUT: u64 = 2 * 60 * 1000; // 2 minutes const STALE_TIMEOUT: u64 = 2 * 60 * 1000; // 2 minutes
@@ -24,6 +25,8 @@ pub struct Chain {
nodes: DenseMap<Node>, nodes: DenseMap<Node>,
/// Dense mapping of FeedId -> Addr<FeedConnector>, /// Dense mapping of FeedId -> Addr<FeedConnector>,
feeds: DenseMap<Addr<FeedConnector>>, feeds: DenseMap<Addr<FeedConnector>>,
/// Mapping of FeedId -> Addr<FeedConnector> for feeds requiring finality info,
finality_feeds: FxHashMap<FeedId, Addr<FeedConnector>>,
/// Best block /// Best block
best: Block, best: Block,
/// Finalized block /// Finalized block
@@ -48,6 +51,7 @@ impl Chain {
label, label,
nodes: DenseMap::new(), nodes: DenseMap::new(),
feeds: DenseMap::new(), feeds: DenseMap::new(),
finality_feeds: FxHashMap::default(),
best: Block::zero(), best: Block::zero(),
finalized: Block::zero(), finalized: Block::zero(),
block_times: NumStats::new(50), block_times: NumStats::new(50),
@@ -65,6 +69,14 @@ impl Chain {
} }
} }
fn broadcast_finality(&mut self) {
if let Some(msg) = self.serializer.finalize() {
for feed in self.finality_feeds.values() {
feed.do_send(msg.clone());
}
}
}
/// Triggered when the number of nodes in this chain has changed, Aggregator will /// Triggered when the number of nodes in this chain has changed, Aggregator will
/// propagate new counts to all connected feeds /// propagate new counts to all connected feeds
fn update_count(&self) { fn update_count(&self) {
@@ -155,6 +167,12 @@ pub struct Subscribe(pub Addr<FeedConnector>);
#[derive(Message)] #[derive(Message)]
pub struct Unsubscribe(pub FeedId); pub struct Unsubscribe(pub FeedId);
#[derive(Message)]
pub struct SendFinality(pub FeedId);
#[derive(Message)]
pub struct NoMoreFinality(pub FeedId);
/// Message sent from the NodeConnector to the Chain when it receives location data /// Message sent from the NodeConnector to the Chain when it receives location data
#[derive(Message)] #[derive(Message)]
pub struct LocateNode { pub struct LocateNode {
@@ -266,6 +284,40 @@ impl Handler<UpdateNode> for Chain {
self.broadcast(); self.broadcast();
return; return;
} }
Details::AfgFinalized(finalized) => {
if let Ok(finalized_number) = finalized.finalized_number.parse::<BlockNumber>() {
if let Some(addr) = node.details().validator.clone() {
self.serializer.push(feed::AfgFinalized(addr, finalized_number,
finalized.finalized_hash));
self.broadcast_finality();
}
}
return;
}
Details::AfgReceivedPrecommit(precommit) => {
if let Ok(finalized_number) = precommit.received.target_number.parse::<BlockNumber>() {
if let Some(addr) = node.details().validator.clone() {
let voter = precommit.received.voter.clone();
self.serializer.push(feed::AfgReceivedPrecommit(addr, finalized_number,
precommit.received.target_hash, voter));
self.broadcast_finality();
}
}
return;
}
Details::AfgReceivedPrevote(prevote) => {
if let Ok(finalized_number) = prevote.received.target_number.parse::<BlockNumber>() {
if let Some(addr) = node.details().validator.clone() {
let voter = prevote.received.voter.clone();
self.serializer.push(feed::AfgReceivedPrevote(addr, finalized_number,
prevote.received.target_hash, voter));
self.broadcast_finality();
}
}
return;
}
Details::AfgReceivedCommit(_) => {
}
_ => (), _ => (),
} }
@@ -350,6 +402,30 @@ impl Handler<Subscribe> for Chain {
} }
} }
impl Handler<SendFinality> for Chain {
type Result = ();
fn handle(&mut self, msg: SendFinality, _ctx: &mut Self::Context) {
let SendFinality(fid) = msg;
if let Some(feed) = self.feeds.get(fid) {
self.finality_feeds.insert(fid, feed.clone());
}
// info!("Added new finality feed {}", fid);
}
}
impl Handler<NoMoreFinality> for Chain {
type Result = ();
fn handle(&mut self, msg: NoMoreFinality, _: &mut Self::Context) {
let NoMoreFinality(fid) = msg;
// info!("Removed finality feed {}", fid);
self.finality_feeds.remove(&fid);
}
}
impl Handler<Unsubscribe> for Chain { impl Handler<Unsubscribe> for Chain {
type Result = (); type Result = ();
@@ -365,6 +441,7 @@ impl Handler<Unsubscribe> for Chain {
} }
self.feeds.remove(fid); self.feeds.remove(fid);
self.finality_feeds.remove(&fid);
} }
} }
+17 -1
View File
@@ -4,7 +4,7 @@ use serde::ser::{Serializer, SerializeTuple};
use serde_json::to_writer; use serde_json::to_writer;
use crate::node::Node; use crate::node::Node;
use crate::types::{ use crate::types::{
NodeId, NodeStats, NodeHardware, BlockNumber, BlockHash, BlockDetails, Timestamp, NodeId, NodeStats, NodeHardware, BlockNumber, BlockHash, BlockDetails, Timestamp, Address,
}; };
pub mod connector; pub mod connector;
@@ -86,6 +86,10 @@ actions! {
0x0D: SubscribedTo<'_>, 0x0D: SubscribedTo<'_>,
0x0E: UnsubscribedFrom<'_>, 0x0E: UnsubscribedFrom<'_>,
0x0F: Pong<'_>, 0x0F: Pong<'_>,
0x10: AfgFinalized,
0x11: AfgReceivedPrevote,
0x12: AfgReceivedPrecommit,
0x13: AfgAuthoritySet,
0x14: StaleNode, 0x14: StaleNode,
} }
@@ -136,6 +140,18 @@ pub struct UnsubscribedFrom<'a>(pub &'a str);
#[derive(Serialize)] #[derive(Serialize)]
pub struct Pong<'a>(pub &'a str); pub struct Pong<'a>(pub &'a str);
#[derive(Serialize)]
pub struct AfgFinalized(pub Address, pub BlockNumber, pub BlockHash);
#[derive(Serialize)]
pub struct AfgReceivedPrevote(pub Address, pub BlockNumber, pub BlockHash, pub Address);
#[derive(Serialize)]
pub struct AfgReceivedPrecommit(pub Address, pub BlockNumber, pub BlockHash, pub Address);
#[derive(Serialize)]
pub struct AfgAuthoritySet(pub Address, pub Address, pub Address, pub BlockNumber, pub BlockHash);
#[derive(Serialize)] #[derive(Serialize)]
pub struct StaleNode(pub NodeId); pub struct StaleNode(pub NodeId);
+16 -2
View File
@@ -2,7 +2,7 @@ use std::time::{Duration, Instant};
use bytes::Bytes; use bytes::Bytes;
use actix::prelude::*; use actix::prelude::*;
use actix_web_actors::ws; use actix_web_actors::ws;
use crate::aggregator::{Aggregator, Connect, Disconnect, Subscribe}; use crate::aggregator::{Aggregator, Connect, Disconnect, Subscribe, SendFinality, NoMoreFinality};
use crate::chain::Unsubscribe; use crate::chain::Unsubscribe;
use crate::feed::{FeedMessageSerializer, Pong}; use crate::feed::{FeedMessageSerializer, Pong};
use crate::util::fnv; use crate::util::fnv;
@@ -76,7 +76,7 @@ impl FeedConnector {
} }
fn handle_cmd(&mut self, cmd: &str, payload: &str, ctx: &mut <Self as Actor>::Context) { fn handle_cmd(&mut self, cmd: &str, payload: &str, ctx: &mut <Self as Actor>::Context) {
match cmd { match cmd {
"subscribe" => { "subscribe" => {
match fnv(payload) { match fnv(payload) {
hash if hash == self.chain_hash => return, hash if hash == self.chain_hash => return,
@@ -99,6 +99,18 @@ impl FeedConnector {
}) })
.wait(ctx); .wait(ctx);
} }
"send-finality" => {
self.aggregator.do_send(SendFinality {
chain: payload.into(),
fid: self.fid_chain,
});
}
"no-more-finality" => {
self.aggregator.do_send(NoMoreFinality {
chain: payload.into(),
fid: self.fid_chain,
});
}
"ping" => { "ping" => {
self.serializer.push(Pong(payload)); self.serializer.push(Pong(payload));
if let Some(serialized) = self.serializer.finalize() { if let Some(serialized) = self.serializer.finalize() {
@@ -141,6 +153,8 @@ impl StreamHandler<ws::Message, ws::ProtocolError> for FeedConnector {
let cmd = &text[..idx]; let cmd = &text[..idx];
let payload = &text[idx+1..]; let payload = &text[idx+1..];
info!("New FEED message: {}", cmd);
self.handle_cmd(cmd, payload, ctx); self.handle_cmd(cmd, payload, ctx);
} }
} }
+5 -2
View File
@@ -141,9 +141,12 @@ impl StreamHandler<ws::Message, ws::ProtocolError> for NodeConnector {
}; };
match serde_json::from_slice(&data) { match serde_json::from_slice(&data) {
Ok(msg) => self.handle_message(msg, data, ctx), Ok(msg) => {
// info!("New node message: {}", std::str::from_utf8(&data).unwrap_or_else(|_| "INVALID UTF8"));
self.handle_message(msg, data, ctx)
},
Err(err) => { Err(err) => {
let data: &[u8] = data.get(..256).unwrap_or_else(|| &data); let data: &[u8] = data.get(..512).unwrap_or_else(|| &data);
warn!("Failed to parse node message: {} {}", err, std::str::from_utf8(data).unwrap_or_else(|_| "INVALID UTF8")) warn!("Failed to parse node message: {} {}", err, std::str::from_utf8(data).unwrap_or_else(|_| "INVALID UTF8"))
}, },
} }
+37 -4
View File
@@ -37,13 +37,13 @@ pub enum Details {
#[serde(rename = "txpool.import")] #[serde(rename = "txpool.import")]
TxPoolImport(IgnoredAny), TxPoolImport(IgnoredAny),
#[serde(rename = "afg.finalized")] #[serde(rename = "afg.finalized")]
AfgFinalized(IgnoredAny), AfgFinalized(AfgFinalized),
#[serde(rename = "afg.received_precommit")] #[serde(rename = "afg.received_precommit")]
AfgReceivedPrecommit(IgnoredAny), AfgReceivedPrecommit(AfgReceivedPrecommit),
#[serde(rename = "afg.received_prevote")] #[serde(rename = "afg.received_prevote")]
AfgReceivedPrevote(IgnoredAny), AfgReceivedPrevote(AfgReceivedPrevote),
#[serde(rename = "afg.received_commit")] #[serde(rename = "afg.received_commit")]
AfgReceivedCommit(IgnoredAny), AfgReceivedCommit(AfgReceivedCommit),
#[serde(rename = "afg.authority_set")] #[serde(rename = "afg.authority_set")]
AfgAuthoritySet(AfgAuthoritySet), AfgAuthoritySet(AfgAuthoritySet),
#[serde(rename = "afg.finalized_blocks_up_to")] #[serde(rename = "afg.finalized_blocks_up_to")]
@@ -86,6 +86,39 @@ pub struct Finalized {
#[derive(Deserialize, Debug)] #[derive(Deserialize, Debug)]
pub struct AfgAuthoritySet { pub struct AfgAuthoritySet {
pub authority_id: Box<str>, pub authority_id: Box<str>,
pub authorities: Box<str>,
pub authority_set_id: Box<str>,
}
#[derive(Deserialize, Debug, Clone)]
pub struct AfgFinalized {
pub finalized_hash: BlockHash,
pub finalized_number: Box<str>,
}
#[derive(Deserialize, Debug, Clone)]
pub struct AfgReceived {
pub target_hash: BlockHash,
pub target_number: Box<str>,
pub voter: Box<str>,
}
#[derive(Deserialize, Debug, Clone)]
pub struct AfgReceivedPrecommit {
#[serde(flatten)]
pub received: AfgReceived,
}
#[derive(Deserialize, Debug, Clone)]
pub struct AfgReceivedPrevote {
#[serde(flatten)]
pub received: AfgReceived,
}
#[derive(Deserialize, Debug, Clone)]
pub struct AfgReceivedCommit {
#[serde(flatten)]
pub received: AfgReceived,
} }
impl Block { impl Block {
+1
View File
@@ -6,6 +6,7 @@ use crate::util::MeanList;
pub type NodeId = usize; pub type NodeId = usize;
pub type BlockNumber = u64; pub type BlockNumber = u64;
pub type Timestamp = u64; pub type Timestamp = u64;
pub type Address = Box<str>;
pub use primitive_types::H256 as BlockHash; pub use primitive_types::H256 as BlockHash;
#[derive(Deserialize, Debug)] #[derive(Deserialize, Debug)]