From 0a89382127b9fb1b95d144cae816c46582975e93 Mon Sep 17 00:00:00 2001 From: Maciej Hirsz <1096222+maciejhirsz@users.noreply.github.com> Date: Fri, 6 Dec 2019 18:21:25 +0100 Subject: [PATCH] AFG messaging (#210) * First draft of afg messaging * AfgReceivedPrevote, AfgReceivedPrecommit and AfgFinalized added to rust backend * Tidy up --- backend/src/aggregator.rs | 36 ++++++++++++++++ backend/src/chain.rs | 79 ++++++++++++++++++++++++++++++++++- backend/src/feed.rs | 18 +++++++- backend/src/feed/connector.rs | 18 +++++++- backend/src/node/connector.rs | 7 +++- backend/src/node/message.rs | 41 ++++++++++++++++-- backend/src/types.rs | 1 + 7 files changed, 190 insertions(+), 10 deletions(-) diff --git a/backend/src/aggregator.rs b/backend/src/aggregator.rs index 3bb0667..e222279 100644 --- a/backend/src/aggregator.rs +++ b/backend/src/aggregator.rs @@ -102,6 +102,20 @@ impl Message for Subscribe { type Result = bool; } +/// Message sent from the FeedConnector to the Aggregator consensus requested +#[derive(Message)] +pub struct SendFinality { + pub chain: Label, + pub fid: FeedId, +} + +/// Message sent from the FeedConnector to the Aggregator no more consensus required +#[derive(Message)] +pub struct NoMoreFinality { + pub chain: Label, + pub fid: FeedId, +} + /// Message sent from the FeedConnector to the Aggregator when first connected #[derive(Message)] pub struct Connect(pub Addr); @@ -165,6 +179,28 @@ impl Handler for Aggregator { } } +impl Handler for Aggregator { + type Result = (); + + fn handle(&mut self, msg: SendFinality, _: &mut Self::Context) { + let SendFinality { chain, fid } = msg; + if let Some(chain) = self.get_chain(&chain) { + chain.addr.do_send(chain::SendFinality(fid)); + } + } +} + +impl Handler for Aggregator { + type Result = (); + + fn handle(&mut self, msg: NoMoreFinality, _: &mut Self::Context) { + let NoMoreFinality { chain, fid } = msg; + if let Some(chain) = self.get_chain(&chain) { + chain.addr.do_send(chain::NoMoreFinality(fid)); + } + } +} + impl Handler for Aggregator { type Result = (); diff --git a/backend/src/chain.rs b/backend/src/chain.rs index 40f8914..b2d03b8 100644 --- a/backend/src/chain.rs +++ b/backend/src/chain.rs @@ -1,13 +1,14 @@ use actix::prelude::*; use std::sync::Arc; use bytes::Bytes; +use rustc_hash::FxHashMap; use crate::aggregator::{Aggregator, DropChain, NodeCount}; use crate::node::{Node, connector::Initialize, message::{NodeMessage, Details}}; use crate::feed::connector::{FeedId, FeedConnector, Subscribed, Unsubscribed}; use crate::feed::{self, FeedMessageSerializer}; use crate::util::{DenseMap, NumStats, now}; -use crate::types::{NodeId, NodeDetails, NodeLocation, Block, Timestamp}; +use crate::types::{NodeId, NodeDetails, NodeLocation, Block, Timestamp, BlockNumber}; const STALE_TIMEOUT: u64 = 2 * 60 * 1000; // 2 minutes @@ -24,6 +25,8 @@ pub struct Chain { nodes: DenseMap, /// Dense mapping of FeedId -> Addr, feeds: DenseMap>, + /// Mapping of FeedId -> Addr for feeds requiring finality info, + finality_feeds: FxHashMap>, /// Best block best: Block, /// Finalized block @@ -48,6 +51,7 @@ impl Chain { label, nodes: DenseMap::new(), feeds: DenseMap::new(), + finality_feeds: FxHashMap::default(), best: Block::zero(), finalized: Block::zero(), block_times: NumStats::new(50), @@ -65,6 +69,14 @@ impl Chain { } } + fn broadcast_finality(&mut self) { + if let Some(msg) = self.serializer.finalize() { + for feed in self.finality_feeds.values() { + feed.do_send(msg.clone()); + } + } + } + /// Triggered when the number of nodes in this chain has changed, Aggregator will /// propagate new counts to all connected feeds fn update_count(&self) { @@ -155,6 +167,12 @@ pub struct Subscribe(pub Addr); #[derive(Message)] pub struct Unsubscribe(pub FeedId); +#[derive(Message)] +pub struct SendFinality(pub FeedId); + +#[derive(Message)] +pub struct NoMoreFinality(pub FeedId); + /// Message sent from the NodeConnector to the Chain when it receives location data #[derive(Message)] pub struct LocateNode { @@ -266,6 +284,40 @@ impl Handler for Chain { self.broadcast(); return; } + Details::AfgFinalized(finalized) => { + if let Ok(finalized_number) = finalized.finalized_number.parse::() { + if let Some(addr) = node.details().validator.clone() { + self.serializer.push(feed::AfgFinalized(addr, finalized_number, + finalized.finalized_hash)); + self.broadcast_finality(); + } + } + return; + } + Details::AfgReceivedPrecommit(precommit) => { + if let Ok(finalized_number) = precommit.received.target_number.parse::() { + if let Some(addr) = node.details().validator.clone() { + let voter = precommit.received.voter.clone(); + self.serializer.push(feed::AfgReceivedPrecommit(addr, finalized_number, + precommit.received.target_hash, voter)); + self.broadcast_finality(); + } + } + return; + } + Details::AfgReceivedPrevote(prevote) => { + if let Ok(finalized_number) = prevote.received.target_number.parse::() { + if let Some(addr) = node.details().validator.clone() { + let voter = prevote.received.voter.clone(); + self.serializer.push(feed::AfgReceivedPrevote(addr, finalized_number, + prevote.received.target_hash, voter)); + self.broadcast_finality(); + } + } + return; + } + Details::AfgReceivedCommit(_) => { + } _ => (), } @@ -350,6 +402,30 @@ impl Handler for Chain { } } +impl Handler for Chain { + type Result = (); + + fn handle(&mut self, msg: SendFinality, _ctx: &mut Self::Context) { + let SendFinality(fid) = msg; + if let Some(feed) = self.feeds.get(fid) { + self.finality_feeds.insert(fid, feed.clone()); + } + + // info!("Added new finality feed {}", fid); + } +} + +impl Handler for Chain { + type Result = (); + + fn handle(&mut self, msg: NoMoreFinality, _: &mut Self::Context) { + let NoMoreFinality(fid) = msg; + + // info!("Removed finality feed {}", fid); + self.finality_feeds.remove(&fid); + } +} + impl Handler for Chain { type Result = (); @@ -365,6 +441,7 @@ impl Handler for Chain { } self.feeds.remove(fid); + self.finality_feeds.remove(&fid); } } diff --git a/backend/src/feed.rs b/backend/src/feed.rs index fcf60d5..099ab1e 100644 --- a/backend/src/feed.rs +++ b/backend/src/feed.rs @@ -4,7 +4,7 @@ use serde::ser::{Serializer, SerializeTuple}; use serde_json::to_writer; use crate::node::Node; use crate::types::{ - NodeId, NodeStats, NodeHardware, BlockNumber, BlockHash, BlockDetails, Timestamp, + NodeId, NodeStats, NodeHardware, BlockNumber, BlockHash, BlockDetails, Timestamp, Address, }; pub mod connector; @@ -86,6 +86,10 @@ actions! { 0x0D: SubscribedTo<'_>, 0x0E: UnsubscribedFrom<'_>, 0x0F: Pong<'_>, + 0x10: AfgFinalized, + 0x11: AfgReceivedPrevote, + 0x12: AfgReceivedPrecommit, + 0x13: AfgAuthoritySet, 0x14: StaleNode, } @@ -136,6 +140,18 @@ pub struct UnsubscribedFrom<'a>(pub &'a str); #[derive(Serialize)] pub struct Pong<'a>(pub &'a str); +#[derive(Serialize)] +pub struct AfgFinalized(pub Address, pub BlockNumber, pub BlockHash); + +#[derive(Serialize)] +pub struct AfgReceivedPrevote(pub Address, pub BlockNumber, pub BlockHash, pub Address); + +#[derive(Serialize)] +pub struct AfgReceivedPrecommit(pub Address, pub BlockNumber, pub BlockHash, pub Address); + +#[derive(Serialize)] +pub struct AfgAuthoritySet(pub Address, pub Address, pub Address, pub BlockNumber, pub BlockHash); + #[derive(Serialize)] pub struct StaleNode(pub NodeId); diff --git a/backend/src/feed/connector.rs b/backend/src/feed/connector.rs index 5689c8a..cb27816 100644 --- a/backend/src/feed/connector.rs +++ b/backend/src/feed/connector.rs @@ -2,7 +2,7 @@ use std::time::{Duration, Instant}; use bytes::Bytes; use actix::prelude::*; use actix_web_actors::ws; -use crate::aggregator::{Aggregator, Connect, Disconnect, Subscribe}; +use crate::aggregator::{Aggregator, Connect, Disconnect, Subscribe, SendFinality, NoMoreFinality}; use crate::chain::Unsubscribe; use crate::feed::{FeedMessageSerializer, Pong}; use crate::util::fnv; @@ -76,7 +76,7 @@ impl FeedConnector { } fn handle_cmd(&mut self, cmd: &str, payload: &str, ctx: &mut ::Context) { - match cmd { + match cmd { "subscribe" => { match fnv(payload) { hash if hash == self.chain_hash => return, @@ -99,6 +99,18 @@ impl FeedConnector { }) .wait(ctx); } + "send-finality" => { + self.aggregator.do_send(SendFinality { + chain: payload.into(), + fid: self.fid_chain, + }); + } + "no-more-finality" => { + self.aggregator.do_send(NoMoreFinality { + chain: payload.into(), + fid: self.fid_chain, + }); + } "ping" => { self.serializer.push(Pong(payload)); if let Some(serialized) = self.serializer.finalize() { @@ -141,6 +153,8 @@ impl StreamHandler for FeedConnector { let cmd = &text[..idx]; let payload = &text[idx+1..]; + info!("New FEED message: {}", cmd); + self.handle_cmd(cmd, payload, ctx); } } diff --git a/backend/src/node/connector.rs b/backend/src/node/connector.rs index b81ae8b..7030de6 100644 --- a/backend/src/node/connector.rs +++ b/backend/src/node/connector.rs @@ -141,9 +141,12 @@ impl StreamHandler for NodeConnector { }; match serde_json::from_slice(&data) { - Ok(msg) => self.handle_message(msg, data, ctx), + Ok(msg) => { + // info!("New node message: {}", std::str::from_utf8(&data).unwrap_or_else(|_| "INVALID UTF8")); + self.handle_message(msg, data, ctx) + }, Err(err) => { - let data: &[u8] = data.get(..256).unwrap_or_else(|| &data); + let data: &[u8] = data.get(..512).unwrap_or_else(|| &data); warn!("Failed to parse node message: {} {}", err, std::str::from_utf8(data).unwrap_or_else(|_| "INVALID UTF8")) }, } diff --git a/backend/src/node/message.rs b/backend/src/node/message.rs index f4d4370..ae81226 100644 --- a/backend/src/node/message.rs +++ b/backend/src/node/message.rs @@ -37,13 +37,13 @@ pub enum Details { #[serde(rename = "txpool.import")] TxPoolImport(IgnoredAny), #[serde(rename = "afg.finalized")] - AfgFinalized(IgnoredAny), + AfgFinalized(AfgFinalized), #[serde(rename = "afg.received_precommit")] - AfgReceivedPrecommit(IgnoredAny), + AfgReceivedPrecommit(AfgReceivedPrecommit), #[serde(rename = "afg.received_prevote")] - AfgReceivedPrevote(IgnoredAny), + AfgReceivedPrevote(AfgReceivedPrevote), #[serde(rename = "afg.received_commit")] - AfgReceivedCommit(IgnoredAny), + AfgReceivedCommit(AfgReceivedCommit), #[serde(rename = "afg.authority_set")] AfgAuthoritySet(AfgAuthoritySet), #[serde(rename = "afg.finalized_blocks_up_to")] @@ -86,6 +86,39 @@ pub struct Finalized { #[derive(Deserialize, Debug)] pub struct AfgAuthoritySet { pub authority_id: Box, + pub authorities: Box, + pub authority_set_id: Box, +} + +#[derive(Deserialize, Debug, Clone)] +pub struct AfgFinalized { + pub finalized_hash: BlockHash, + pub finalized_number: Box, +} + +#[derive(Deserialize, Debug, Clone)] +pub struct AfgReceived { + pub target_hash: BlockHash, + pub target_number: Box, + pub voter: Box, +} + +#[derive(Deserialize, Debug, Clone)] +pub struct AfgReceivedPrecommit { + #[serde(flatten)] + pub received: AfgReceived, +} + +#[derive(Deserialize, Debug, Clone)] +pub struct AfgReceivedPrevote { + #[serde(flatten)] + pub received: AfgReceived, +} + +#[derive(Deserialize, Debug, Clone)] +pub struct AfgReceivedCommit { + #[serde(flatten)] + pub received: AfgReceived, } impl Block { diff --git a/backend/src/types.rs b/backend/src/types.rs index c50d172..ee4f4a7 100644 --- a/backend/src/types.rs +++ b/backend/src/types.rs @@ -6,6 +6,7 @@ use crate::util::MeanList; pub type NodeId = usize; pub type BlockNumber = u64; pub type Timestamp = u64; +pub type Address = Box; pub use primitive_types::H256 as BlockHash; #[derive(Deserialize, Debug)]